diff --git a/Makefile b/Makefile index 9100dc0..d03a8e6 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ __default__: @echo "Please specify a target to make" -GEN=python3 -m grpc_tools.protoc -I. --python_out=. --grpclib_python_out=. +GEN=python3 -m grpc_tools.protoc -I. --python_out=. --grpclib_python_out=. --pyi_out=. GENERATED=*{_pb2.py,_grpc.py,.pyi} clean: diff --git a/marznode/backends/base.py b/marznode/backends/base.py index c14ab96..6654072 100644 --- a/marznode/backends/base.py +++ b/marznode/backends/base.py @@ -8,6 +8,19 @@ class VPNBackend(ABC): + backend_type: str + config_format: int + + @property + @abstractmethod + def version(self) -> str | None: + raise NotImplementedError + + @property + @abstractmethod + def running(self) -> bool: + raise NotImplementedError + @abstractmethod def contains_tag(self, tag: str) -> bool: raise NotImplementedError @@ -35,3 +48,15 @@ def get_logs(self, include_buffer: bool) -> AsyncIterator: @abstractmethod async def get_usages(self): raise NotImplementedError + + @abstractmethod + def list_inbounds(self): + raise NotImplementedError + + @abstractmethod + def get_config(self): + raise NotImplementedError + + @abstractmethod + def save_config(self, config: str): + raise NotImplementedError diff --git a/marznode/backends/hysteria2/_config.py b/marznode/backends/hysteria2/_config.py index 37359f5..c6a39ea 100644 --- a/marznode/backends/hysteria2/_config.py +++ b/marznode/backends/hysteria2/_config.py @@ -48,9 +48,13 @@ def __init__( self._inbound.update({"path": obfs_password, "header_type": obfs_type}) def register_inbounds(self, storage: BaseStorage): - inbound = self._inbound - storage.register_inbound( - Inbound(tag=inbound["tag"], protocol=inbound["protocol"], config=inbound) + storage.register_inbound(self.get_inbound()) + + def get_inbound(self): + return Inbound( + tag=self._inbound["tag"], + protocol=self._inbound["protocol"], + config=self._inbound, ) def render(self): diff --git a/marznode/backends/hysteria2/_runner.py b/marznode/backends/hysteria2/_runner.py index 66d6357..ccc3672 100644 --- a/marznode/backends/hysteria2/_runner.py +++ b/marznode/backends/hysteria2/_runner.py @@ -7,6 +7,8 @@ import yaml from anyio import BrokenResourceError, ClosedResourceError, create_memory_object_stream +from marznode.backends.hysteria2._utils import get_version + logger = logging.getLogger(__name__) @@ -17,7 +19,8 @@ def __init__(self, executable_path: str): self._snd_streams = [] self._logs_buffer = deque(maxlen=100) self._capture_task = None - atexit.register(lambda: self.stop() if self.started else None) + self.version = get_version(executable_path) + atexit.register(lambda: self.stop() if self.running else None) async def start(self, config: dict): with tempfile.NamedTemporaryFile( @@ -36,11 +39,11 @@ async def start(self, config: dict): asyncio.create_task(self.__capture_process_logs()) def stop(self): - if self.started: + if self.running: self._process.terminate() @property - def started(self): + def running(self): return self._process and self._process.returncode is None async def __capture_process_logs(self): @@ -50,9 +53,6 @@ async def __capture_process_logs(self): async def capture_stream(stream): while True: output = await stream.readline() - if output == b"": - """break in case of eof""" - return for stm in self._snd_streams: try: await stm.send(output) @@ -60,6 +60,10 @@ async def capture_stream(stream): self._snd_streams.remove(stm) continue self._logs_buffer.append(output) + if output == b"": + """break in case of eof""" + logger.warning("Hysteria has stopped") + return await asyncio.gather( capture_stream(self._process.stderr), capture_stream(self._process.stdout) diff --git a/marznode/backends/hysteria2/_utils.py b/marznode/backends/hysteria2/_utils.py new file mode 100644 index 0000000..b7cda83 --- /dev/null +++ b/marznode/backends/hysteria2/_utils.py @@ -0,0 +1,18 @@ +import re +import subprocess + + +def get_version(hysteria_path: str) -> str | None: + """ + get xray version by running its executable + :param hysteria_path: + :return: xray version + """ + cmd = [hysteria_path, "version"] + output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode() + pattern = r"Version:\s*v(\d+\.\d+\.\d+)" + match = re.search(pattern, output) + if match: + return match.group(1) + else: + return None diff --git a/marznode/backends/hysteria2/interface.py b/marznode/backends/hysteria2/interface.py index 6cf4c7b..59c2f6d 100644 --- a/marznode/backends/hysteria2/interface.py +++ b/marznode/backends/hysteria2/interface.py @@ -1,10 +1,11 @@ +import asyncio import json import logging from secrets import token_hex -from typing import AsyncIterator, Any +from typing import AsyncIterator import aiohttp -from aiohttp import web +from aiohttp import web, ClientConnectorError from marznode.backends.base import VPNBackend from marznode.backends.hysteria2._config import HysteriaConfig @@ -18,37 +19,67 @@ class HysteriaBackend(VPNBackend): - def __init__(self, executable_path: str, storage: BaseStorage): + backend_type = "hysteria2" + config_format = 2 + + def __init__(self, executable_path: str, config_path: str, storage: BaseStorage): + self._app_runner = None self._executable_path = executable_path self._storage = storage - self._inbounds = ["hysteria2"] + self._inbound_tags = ["hysteria2"] + self._inbounds = list() self._users = {} self._auth_site = None self._runner = Hysteria(self._executable_path) self._stats_secret = None self._stats_port = None + self._config_path = config_path + self._restart_lock = asyncio.Lock() + + @property + def running(self) -> bool: + return self._runner.running + + @property + def version(self): + return self._runner.version def contains_tag(self, tag: str) -> bool: return bool(tag == "hysteria2") - async def start(self, config_path: str) -> None: + def list_inbounds(self) -> list: + return self._inbounds + + def get_config(self) -> str: + with open(self._config_path) as f: + return f.read() + + def save_config(self, config: str) -> None: + with open(self._config_path, "w") as f: + f.write(config) + + async def start(self, config: str | None = None) -> None: + if config is None: + with open(self._config_path) as f: + config = f.read() + else: + self.save_config(config) api_port = find_free_port() self._stats_port = find_free_port() self._stats_secret = token_hex(16) if self._auth_site: - await self._auth_site.stop() + await self._app_runner.cleanup() app = web.Application() app.router.add_post("/", self._auth_callback) - app_runner = web.AppRunner(app) - await app_runner.setup() + self._app_runner = web.AppRunner(app) + await self._app_runner.setup() - self._auth_site = web.TCPSite(app_runner, "127.0.0.1", api_port) + self._auth_site = web.TCPSite(self._app_runner, "127.0.0.1", api_port) await self._auth_site.start() - with open(config_path) as f: - config = f.read() cfg = HysteriaConfig(config, api_port, self._stats_port, self._stats_secret) cfg.register_inbounds(self._storage) + self._inbounds = [cfg.get_inbound()] await self._runner.start(cfg.render()) async def stop(self): @@ -56,9 +87,13 @@ async def stop(self): self._storage.remove_inbound("hysteria2") self._runner.stop() - async def restart(self, backend_config: Any) -> None: - await self.stop() - await self.start(backend_config) + async def restart(self, backend_config: str | None) -> None: + await self._restart_lock.acquire() + try: + await self.stop() + await self.start(backend_config) + finally: + self._restart_lock.release() async def add_user(self, user: User, inbound: Inbound) -> None: password = generate_password(user.key) @@ -88,9 +123,12 @@ async def get_usages(self): url = "http://127.0.0.1:" + str(self._stats_port) + "/traffic?clear=1" headers = {"Authorization": self._stats_secret} - async with aiohttp.ClientSession() as session: - async with session.get(url, headers=headers) as response: - data = await response.json() + try: + async with aiohttp.ClientSession() as session: + async with session.get(url, headers=headers) as response: + data = await response.json() + except ClientConnectorError: + data = {} usages = {} for user_identifier, usage in data.items(): uid = int(user_identifier.split(".")[0]) diff --git a/marznode/backends/xray/_config.py b/marznode/backends/xray/_config.py index b813e76..7520350 100644 --- a/marznode/backends/xray/_config.py +++ b/marznode/backends/xray/_config.py @@ -29,10 +29,8 @@ def __init__( super().__init__(config) self.inbounds = [] - self.inbounds_by_protocol = {} self.inbounds_by_tag = {} # self._fallbacks_inbound = self.get_inbound(XRAY_FALLBACKS_INBOUND_TAG) - self._addr_clients_by_tag = {} self._resolve_inbounds() self._apply_api() @@ -72,7 +70,7 @@ def _apply_api(self): def _resolve_inbounds(self): for inbound in self["inbounds"]: if ( - inbound["protocol"].lower() + inbound.get("protocol", "").lower() not in { "vmess", "trojan", @@ -83,12 +81,6 @@ def _resolve_inbounds(self): ): continue - if not inbound.get("settings"): - inbound["settings"] = {} - if not inbound["settings"].get("clients"): - inbound["settings"]["clients"] = [] - self._addr_clients_by_tag[inbound["tag"]] = inbound["settings"]["clients"] - settings = { "tag": inbound["tag"], "protocol": inbound["protocol"], @@ -123,23 +115,12 @@ def _resolve_inbounds(self): if inbound["protocol"] == "vless" and net == "tcp": settings["flow"] = XRAY_VLESS_REALITY_FLOW - try: - settings["pbk"] = tls_settings["publicKey"] - except KeyError: - pvk = tls_settings.get("privateKey") - if not pvk: - raise ValueError( - f"You need to provide privateKey in realitySettings of {inbound['tag']}" - ) - x25519 = get_x25519(XRAY_EXECUTABLE_PATH, pvk) - settings["pbk"] = x25519["public_key"] - - try: - settings["sid"] = tls_settings.get("shortIds")[0] - except (IndexError, TypeError): - raise ValueError( - f"You need to define at least one shortID in realitySettings of {inbound['tag']}" - ) + pvk = tls_settings.get("privateKey") + + x25519 = get_x25519(XRAY_EXECUTABLE_PATH, pvk) + settings["pbk"] = x25519["public_key"] + + settings["sid"] = tls_settings.get("shortIds", [""])[0] if net == "tcp": header = net_settings.get("header", {}) @@ -178,28 +159,15 @@ def _resolve_inbounds(self): self.inbounds.append(settings) self.inbounds_by_tag[inbound["tag"]] = settings - try: - self.inbounds_by_protocol[inbound["protocol"]].append(settings) - except KeyError: - self.inbounds_by_protocol[inbound["protocol"]] = [settings] - - def get_inbound(self, tag) -> dict: - for inbound in self["inbounds"]: - if inbound["tag"] == tag: - return inbound - - def get_outbound(self, tag) -> dict: - for outbound in self["outbounds"]: - if outbound["tag"] == tag: - return outbound - def register_inbounds(self, storage: BaseStorage): - inbounds = [ + for inbound in self.list_inbounds(): + storage.register_inbound(inbound) + + def list_inbounds(self) -> list[Inbound]: + return [ Inbound(tag=i["tag"], protocol=i["protocol"], config=i) for i in self.inbounds_by_tag.values() ] - for inbound in inbounds: - storage.register_inbound(inbound) def to_json(self, **json_kwargs): return json.dumps(self, **json_kwargs) diff --git a/marznode/backends/xray/_runner.py b/marznode/backends/xray/_runner.py index 13930fc..e939486 100644 --- a/marznode/backends/xray/_runner.py +++ b/marznode/backends/xray/_runner.py @@ -3,9 +3,11 @@ import asyncio import atexit import logging +import re from collections import deque from anyio import create_memory_object_stream, ClosedResourceError, BrokenResourceError +from anyio.streams.memory import MemoryObjectReceiveStream from ._config import XrayConfig from ._utils import get_version @@ -28,10 +30,10 @@ def __init__(self, executable_path: str, assets_path: str): self._logs_buffer = deque(maxlen=100) self._env = {"XRAY_LOCATION_ASSET": assets_path} - atexit.register(lambda: self.stop() if self.started else None) + atexit.register(lambda: self.stop() if self.running else None) async def start(self, config: XrayConfig): - if self.started is True: + if self.running is True: raise RuntimeError("Xray is started already") if config.get("log", {}).get("logLevel") in ("none", "error"): @@ -51,16 +53,22 @@ async def start(self, config: XrayConfig): await self._process.stdin.wait_closed() logger.info("Xray core %s started", self.version) + logs_stm = self.get_logs_stm() asyncio.create_task(self.__capture_process_logs()) + async for line in logs_stm: + if line == b"" or re.match( + r".*\[Warning] core: Xray \d+\.\d+\.\d+ started", line.decode() + ): # either start or die + logs_stm.close() + return def stop(self): """stops xray if it is started""" - if not self.started: + if not self.running: return self._process.terminate() self._process = None - logger.warning("Xray core stopped") async def restart(self, config: XrayConfig): """restart xray""" @@ -69,7 +77,7 @@ async def restart(self, config: XrayConfig): try: self.restarting = True - logger.warning("Restarting Xray core...") + logger.warning("Restarting Xray core") self.stop() await self.start(config) finally: @@ -82,9 +90,6 @@ async def __capture_process_logs(self): async def capture_stream(stream): while True: output = await stream.readline() - if output == b"": - """break in case of eof""" - return for stm in self._snd_streams: try: await stm.send(output) @@ -92,12 +97,16 @@ async def capture_stream(stream): self._snd_streams.remove(stm) continue self._logs_buffer.append(output) + if output == b"": + """break in case of eof""" + logger.warning("Xray stopped/died") + return await asyncio.gather( capture_stream(self._process.stderr), capture_stream(self._process.stdout) ) - def get_logs_stm(self): + def get_logs_stm(self) -> MemoryObjectReceiveStream: new_snd_stm, new_rcv_stm = create_memory_object_stream() self._snd_streams.append(new_snd_stm) return new_rcv_stm @@ -106,8 +115,7 @@ def get_buffer(self): """makes a copy of the buffer, so it could be read multiple times the buffer is never cleared in case logs from xray's exit are useful""" return self._logs_buffer.copy() - # return [line for line in self._logs_buffer] @property - def started(self): + def running(self): return self._process and self._process.returncode is None diff --git a/marznode/backends/xray/_utils.py b/marznode/backends/xray/_utils.py index 5d11985..ae57610 100644 --- a/marznode/backends/xray/_utils.py +++ b/marznode/backends/xray/_utils.py @@ -7,15 +7,15 @@ def get_version(xray_path: str) -> str | None: """ - get xray version by running it's executable + get xray version by running its executable :param xray_path: :return: xray version """ cmd = [xray_path, "version"] - output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode("utf-8") + output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode() match = re.match(r"^Xray (\d+\.\d+\.\d+)", output) if match: - return match.groups()[0] + return match.group(1) return None diff --git a/marznode/backends/xray/interface.py b/marznode/backends/xray/interface.py index 5a8b8c4..5877999 100644 --- a/marznode/backends/xray/interface.py +++ b/marznode/backends/xray/interface.py @@ -1,10 +1,10 @@ """What a vpn server should do""" import asyncio +import json import logging from collections import defaultdict -from marznode import config from marznode.backends.base import VPNBackend from marznode.backends.xray._config import XrayConfig from marznode.backends.xray._runner import XrayCore @@ -23,21 +23,58 @@ class XrayBackend(VPNBackend): - def __init__(self, storage: BaseStorage): + backend_type = "xray" + config_format = 1 + + def __init__( + self, + executable_path: str, + assets_path: str, + config_path: str, + storage: BaseStorage, + ): self._config = None self._inbound_tags = set() + self._inbounds = list() self._api = None - self._runner = XrayCore(config.XRAY_EXECUTABLE_PATH, config.XRAY_ASSETS_PATH) + self._runner = XrayCore(executable_path, assets_path) self._storage = storage + self._config_path = config_path + self._restart_lock = asyncio.Lock() + + @property + def running(self) -> bool: + return self._runner.running + + @property + def version(self): + return self._runner.version def contains_tag(self, tag: str) -> bool: return tag in self._inbound_tags - async def start(self, backend_config: str): + def list_inbounds(self) -> list: + return self._inbounds + + def get_config(self) -> str: + with open(self._config_path) as f: + return f.read() + + def save_config(self, config: str) -> None: + with open(self._config_path, "w") as f: + f.write(config) + + async def start(self, backend_config: str | None = None): + if backend_config is None: + with open(self._config_path) as f: + backend_config = f.read() + else: + self.save_config(json.dumps(json.loads(backend_config), indent=2)) xray_api_port = find_free_port() self._config = XrayConfig(backend_config, api_port=xray_api_port) self._config.register_inbounds(self._storage) self._inbound_tags = {i["tag"] for i in self._config.inbounds} + self._inbounds = list(self._config.list_inbounds()) self._api = XrayAPI("127.0.0.1", xray_api_port) await self._runner.start(self._config) await asyncio.sleep(0.15) @@ -47,13 +84,18 @@ def stop(self): for tag in self._inbound_tags: self._storage.remove_inbound(tag) self._inbound_tags = set() + self._inbounds = set() async def restart(self, backend_config: str | None) -> list[Inbound] | None: # xray_config = backend_config if backend_config else self._config - if not backend_config: - return await self._runner.restart(self._config) - self.stop() - await self.start(backend_config) + await self._restart_lock.acquire() + try: + if not backend_config: + return await self._runner.restart(self._config) + self.stop() + await self.start(backend_config) + finally: + self._restart_lock.release() async def add_user(self, user: User, inbound: Inbound): email = f"{user.id}.{user.username}" @@ -71,6 +113,8 @@ async def add_user(self, user: User, inbound: Inbound): await self._api.add_inbound_user(inbound.tag, user_account) except (EmailExistsError, TagNotFoundError): raise + except OSError: + logger.warning("user addition requested when xray api is down") async def remove_user(self, user: User, inbound: Inbound): email = f"{user.id}.{user.username}" @@ -78,9 +122,14 @@ async def remove_user(self, user: User, inbound: Inbound): await self._api.remove_inbound_user(inbound.tag, email) except (EmailNotFoundError, TagNotFoundError): raise + except OSError: + logger.warning("user removal requested when xray api is down") async def get_usages(self, reset: bool = True) -> dict[int, int]: - api_stats = await self._api.get_users_stats(reset=reset) + try: + api_stats = await self._api.get_users_stats(reset=reset) + except OSError: + api_stats = [] stats = defaultdict(int) for stat in api_stats: uid = int(stat.name.split(".")[0]) diff --git a/marznode/marznode.py b/marznode/marznode.py index 4e56066..ef18295 100644 --- a/marznode/marznode.py +++ b/marznode/marznode.py @@ -17,6 +17,8 @@ XRAY_CONFIG_PATH, HYSTERIA_ENABLED, XRAY_ENABLED, + XRAY_EXECUTABLE_PATH, + XRAY_ASSETS_PATH, ) from marznode.service import MarzService from marznode.storage import MemoryStorage @@ -46,15 +48,22 @@ async def main(): ) storage = MemoryStorage() - backends = [] + backends = dict() if XRAY_ENABLED: - xray_backend = XrayBackend(storage) - await xray_backend.start(XRAY_CONFIG_PATH) - backends.append(xray_backend) + xray_backend = XrayBackend( + XRAY_EXECUTABLE_PATH, + XRAY_ASSETS_PATH, + XRAY_CONFIG_PATH, + storage, + ) + await xray_backend.start() + backends.update({"xray": xray_backend}) if HYSTERIA_ENABLED: - hysteria_backend = HysteriaBackend(HYSTERIA_EXECUTABLE_PATH, storage) - await hysteria_backend.start(HYSTERIA_CONFIG_PATH) - backends.append(hysteria_backend) + hysteria_backend = HysteriaBackend( + HYSTERIA_EXECUTABLE_PATH, HYSTERIA_CONFIG_PATH, storage + ) + await hysteria_backend.start() + backends.update({"hysteria2": hysteria_backend}) server = Server([MarzService(storage, backends), Health()]) diff --git a/marznode/service/service.proto b/marznode/service/service.proto index f7b1f64..9ed9332 100644 --- a/marznode/service/service.proto +++ b/marznode/service/service.proto @@ -4,15 +4,28 @@ package marznode; message Empty {} +enum ConfigFormat { + PLAIN = 0; + JSON = 1; + YAML = 2; +} + +message Backend { + string name = 1; + optional string type = 2; + optional string version = 3; + repeated Inbound inbounds = 4; +} + +message BackendsResponse { + repeated Backend backends = 1; +} + message Inbound { string tag = 1; optional string config = 2; } -message InboundsResponse { - repeated Inbound inbounds = 1; -} - message User { uint32 id = 1; string username = 2; @@ -40,20 +53,32 @@ message LogLine { string line = 1; } -message XrayConfig { +message BackendConfig { string configuration = 1; + ConfigFormat config_format = 2; +} + +message BackendLogsRequest { + string backend_name = 1; + bool include_buffer = 2; +} + +message RestartBackendRequest { + string backend_name = 1; + optional BackendConfig config = 2; } -message XrayLogsRequest { - bool include_buffer = 1; +message BackendStats { + bool running = 1; } service MarzService { rpc SyncUsers(stream UserData) returns (Empty); rpc RepopulateUsers(UsersData) returns (Empty); - rpc FetchInbounds(Empty) returns (InboundsResponse); + rpc FetchBackends(Empty) returns (BackendsResponse); rpc FetchUsersStats(Empty) returns (UsersStats); - rpc FetchXrayConfig(Empty) returns (XrayConfig); - rpc RestartXray(XrayConfig) returns (InboundsResponse); - rpc StreamXrayLogs(XrayLogsRequest) returns(stream LogLine); + rpc FetchBackendConfig(Backend) returns (BackendConfig); + rpc RestartBackend(RestartBackendRequest) returns (Empty); + rpc StreamBackendLogs(BackendLogsRequest) returns (stream LogLine); + rpc GetBackendStats(Backend) returns (BackendStats); } diff --git a/marznode/service/service.py b/marznode/service/service.py index 14aa080..5fa41fd 100644 --- a/marznode/service/service.py +++ b/marznode/service/service.py @@ -7,22 +7,28 @@ import logging from collections import defaultdict +from grpclib import GRPCError, Status from grpclib.server import Stream from marznode.backends.base import VPNBackend from marznode.storage import BaseStorage from .service_grpc import MarzServiceBase +from .service_pb2 import ( + BackendConfig as BackendConfig_pb2, + Backend, + BackendLogsRequest, + RestartBackendRequest, + BackendStats, +) from .service_pb2 import ( UserData, UsersData, Empty, - InboundsResponse, + BackendsResponse, Inbound, UsersStats, LogLine, ) -from .service_pb2 import XrayConfig as XrayConfig_pb2 -from .. import config from ..models import User, Inbound as InboundModel logger = logging.getLogger(__name__) @@ -31,12 +37,12 @@ class MarzService(MarzServiceBase): """Add/Update/Delete users based on calls from the client""" - def __init__(self, storage: BaseStorage, backends: list[VPNBackend]): + def __init__(self, storage: BaseStorage, backends: dict[str, VPNBackend]): self._backends = backends self._storage = storage def _resolve_tag(self, inbound_tag: str) -> VPNBackend: - for backend in self._backends: + for backend in self._backends.values(): if backend.contains_tag(inbound_tag): return backend raise @@ -92,20 +98,28 @@ async def _update_user(self, user_data: UserData): await self._add_user(storage_user, added_inbounds) await self._storage.update_user_inbounds(storage_user, new_inbounds) - async def SyncUsers(self, stream: "Stream[UserData," "Empty]") -> None: + async def SyncUsers(self, stream: Stream[UserData, Empty]) -> None: async for user_data in stream: await self._update_user(user_data) - async def FetchInbounds( + async def FetchBackends( self, - stream: Stream[Empty, InboundsResponse], + stream: Stream[Empty, BackendsResponse], ) -> None: await stream.recv_message() - stored_inbounds = await self._storage.list_inbounds() - inbounds = [ - Inbound(tag=i.tag, config=json.dumps(i.config)) for i in stored_inbounds + backends = [ + Backend( + name=name, + type=backend.backend_type, + version=backend.version, + inbounds=[ + Inbound(tag=i.tag, config=json.dumps(i.config)) + for i in backend.list_inbounds() + ], + ) + for name, backend in self._backends.items() ] - await stream.send_message(InboundsResponse(inbounds=inbounds)) + await stream.send_message(BackendsResponse(backends=backends)) async def RepopulateUsers( self, @@ -124,7 +138,7 @@ async def FetchUsersStats(self, stream: Stream[Empty, UsersStats]) -> None: await stream.recv_message() all_stats = defaultdict(int) - for backend in self._backends: + for backend in self._backends.values(): stats = await backend.get_usages() for user, usage in stats.items(): @@ -137,25 +151,39 @@ async def FetchUsersStats(self, stream: Stream[Empty, UsersStats]) -> None: ] await stream.send_message(UsersStats(users_stats=user_stats)) - async def StreamXrayLogs(self, stream: Stream[Empty, LogLine]) -> None: + async def StreamBackendLogs( + self, stream: Stream[BackendLogsRequest, LogLine] + ) -> None: req = await stream.recv_message() - async for line in self._backends[0].get_logs(req.include_buffer): + if req.backend_name not in self._backends: + raise + async for line in self._backends[req.backend_name].get_logs(req.include_buffer): await stream.send_message(LogLine(line=line)) - async def FetchXrayConfig(self, stream: Stream[Empty, XrayConfig_pb2]) -> None: - await stream.recv_message() - with open(config.XRAY_CONFIG_PATH) as f: - content = f.read() - await stream.send_message(XrayConfig_pb2(configuration=content)) - - async def RestartXray( - self, stream: Stream[XrayConfig_pb2, InboundsResponse] + async def FetchBackendConfig( + self, stream: Stream[Backend, BackendConfig_pb2] + ) -> None: + req = await stream.recv_message() + backend = self._backends[req.name] + config = backend.get_config() + await stream.send_message( + BackendConfig_pb2(configuration=config, config_format=backend.config_format) + ) + + async def RestartBackend( + self, stream: Stream[RestartBackendRequest, Empty] ) -> None: message = await stream.recv_message() - await self._storage.flush_users() - inbounds = await self._backends[0].restart(message.configuration) - logger.debug(inbounds) - await stream.send_message(InboundsResponse(inbounds=[])) - with open(config.XRAY_CONFIG_PATH, "w") as f: - f.write(json.dumps(json.loads(message.configuration), indent=2)) + await self._backends[message.backend_name].restart(message.config.configuration) + await stream.send_message(Empty()) + + async def GetBackendStats(self, stream: Stream[Backend, BackendStats]): + backend = await stream.recv_message() + if backend.name not in self._backends.keys(): + raise GRPCError( + Status.NOT_FOUND, + "Backend doesn't exist", + ) + running = self._backends[backend.name].running + await stream.send_message(BackendStats(running=running)) diff --git a/marznode/service/service_grpc.py b/marznode/service/service_grpc.py index 3853d9e..c68cae3 100644 --- a/marznode/service/service_grpc.py +++ b/marznode/service/service_grpc.py @@ -23,7 +23,7 @@ async def RepopulateUsers(self, stream: 'grpclib.server.Stream[marznode.service. pass @abc.abstractmethod - async def FetchInbounds(self, stream: 'grpclib.server.Stream[marznode.service.service_pb2.Empty, marznode.service.service_pb2.InboundsResponse]') -> None: + async def FetchBackends(self, stream: 'grpclib.server.Stream[marznode.service.service_pb2.Empty, marznode.service.service_pb2.BackendsResponse]') -> None: pass @abc.abstractmethod @@ -31,15 +31,19 @@ async def FetchUsersStats(self, stream: 'grpclib.server.Stream[marznode.service. pass @abc.abstractmethod - async def FetchXrayConfig(self, stream: 'grpclib.server.Stream[marznode.service.service_pb2.Empty, marznode.service.service_pb2.XrayConfig]') -> None: + async def FetchBackendConfig(self, stream: 'grpclib.server.Stream[marznode.service.service_pb2.Backend, marznode.service.service_pb2.BackendConfig]') -> None: pass @abc.abstractmethod - async def RestartXray(self, stream: 'grpclib.server.Stream[marznode.service.service_pb2.XrayConfig, marznode.service.service_pb2.InboundsResponse]') -> None: + async def RestartBackend(self, stream: 'grpclib.server.Stream[marznode.service.service_pb2.RestartBackendRequest, marznode.service.service_pb2.Empty]') -> None: pass @abc.abstractmethod - async def StreamXrayLogs(self, stream: 'grpclib.server.Stream[marznode.service.service_pb2.XrayLogsRequest, marznode.service.service_pb2.LogLine]') -> None: + async def StreamBackendLogs(self, stream: 'grpclib.server.Stream[marznode.service.service_pb2.BackendLogsRequest, marznode.service.service_pb2.LogLine]') -> None: + pass + + @abc.abstractmethod + async def GetBackendStats(self, stream: 'grpclib.server.Stream[marznode.service.service_pb2.Backend, marznode.service.service_pb2.BackendStats]') -> None: pass def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]: @@ -56,11 +60,11 @@ def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]: marznode.service.service_pb2.UsersData, marznode.service.service_pb2.Empty, ), - '/marznode.MarzService/FetchInbounds': grpclib.const.Handler( - self.FetchInbounds, + '/marznode.MarzService/FetchBackends': grpclib.const.Handler( + self.FetchBackends, grpclib.const.Cardinality.UNARY_UNARY, marznode.service.service_pb2.Empty, - marznode.service.service_pb2.InboundsResponse, + marznode.service.service_pb2.BackendsResponse, ), '/marznode.MarzService/FetchUsersStats': grpclib.const.Handler( self.FetchUsersStats, @@ -68,24 +72,30 @@ def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]: marznode.service.service_pb2.Empty, marznode.service.service_pb2.UsersStats, ), - '/marznode.MarzService/FetchXrayConfig': grpclib.const.Handler( - self.FetchXrayConfig, + '/marznode.MarzService/FetchBackendConfig': grpclib.const.Handler( + self.FetchBackendConfig, grpclib.const.Cardinality.UNARY_UNARY, - marznode.service.service_pb2.Empty, - marznode.service.service_pb2.XrayConfig, + marznode.service.service_pb2.Backend, + marznode.service.service_pb2.BackendConfig, ), - '/marznode.MarzService/RestartXray': grpclib.const.Handler( - self.RestartXray, + '/marznode.MarzService/RestartBackend': grpclib.const.Handler( + self.RestartBackend, grpclib.const.Cardinality.UNARY_UNARY, - marznode.service.service_pb2.XrayConfig, - marznode.service.service_pb2.InboundsResponse, + marznode.service.service_pb2.RestartBackendRequest, + marznode.service.service_pb2.Empty, ), - '/marznode.MarzService/StreamXrayLogs': grpclib.const.Handler( - self.StreamXrayLogs, + '/marznode.MarzService/StreamBackendLogs': grpclib.const.Handler( + self.StreamBackendLogs, grpclib.const.Cardinality.UNARY_STREAM, - marznode.service.service_pb2.XrayLogsRequest, + marznode.service.service_pb2.BackendLogsRequest, marznode.service.service_pb2.LogLine, ), + '/marznode.MarzService/GetBackendStats': grpclib.const.Handler( + self.GetBackendStats, + grpclib.const.Cardinality.UNARY_UNARY, + marznode.service.service_pb2.Backend, + marznode.service.service_pb2.BackendStats, + ), } @@ -104,11 +114,11 @@ def __init__(self, channel: grpclib.client.Channel) -> None: marznode.service.service_pb2.UsersData, marznode.service.service_pb2.Empty, ) - self.FetchInbounds = grpclib.client.UnaryUnaryMethod( + self.FetchBackends = grpclib.client.UnaryUnaryMethod( channel, - '/marznode.MarzService/FetchInbounds', + '/marznode.MarzService/FetchBackends', marznode.service.service_pb2.Empty, - marznode.service.service_pb2.InboundsResponse, + marznode.service.service_pb2.BackendsResponse, ) self.FetchUsersStats = grpclib.client.UnaryUnaryMethod( channel, @@ -116,21 +126,27 @@ def __init__(self, channel: grpclib.client.Channel) -> None: marznode.service.service_pb2.Empty, marznode.service.service_pb2.UsersStats, ) - self.FetchXrayConfig = grpclib.client.UnaryUnaryMethod( + self.FetchBackendConfig = grpclib.client.UnaryUnaryMethod( channel, - '/marznode.MarzService/FetchXrayConfig', - marznode.service.service_pb2.Empty, - marznode.service.service_pb2.XrayConfig, + '/marznode.MarzService/FetchBackendConfig', + marznode.service.service_pb2.Backend, + marznode.service.service_pb2.BackendConfig, ) - self.RestartXray = grpclib.client.UnaryUnaryMethod( + self.RestartBackend = grpclib.client.UnaryUnaryMethod( channel, - '/marznode.MarzService/RestartXray', - marznode.service.service_pb2.XrayConfig, - marznode.service.service_pb2.InboundsResponse, + '/marznode.MarzService/RestartBackend', + marznode.service.service_pb2.RestartBackendRequest, + marznode.service.service_pb2.Empty, ) - self.StreamXrayLogs = grpclib.client.UnaryStreamMethod( + self.StreamBackendLogs = grpclib.client.UnaryStreamMethod( channel, - '/marznode.MarzService/StreamXrayLogs', - marznode.service.service_pb2.XrayLogsRequest, + '/marznode.MarzService/StreamBackendLogs', + marznode.service.service_pb2.BackendLogsRequest, marznode.service.service_pb2.LogLine, ) + self.GetBackendStats = grpclib.client.UnaryUnaryMethod( + channel, + '/marznode.MarzService/GetBackendStats', + marznode.service.service_pb2.Backend, + marznode.service.service_pb2.BackendStats, + ) diff --git a/marznode/service/service_pb2.py b/marznode/service/service_pb2.py index 62a5f87..3c7477d 100644 --- a/marznode/service/service_pb2.py +++ b/marznode/service/service_pb2.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: marznode/service/service.proto -# Protobuf Python Version: 4.25.0 +# Protobuf Python Version: 5.26.1 """Generated protocol buffer code.""" from google.protobuf import descriptor as _descriptor from google.protobuf import descriptor_pool as _descriptor_pool @@ -14,35 +14,43 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1emarznode/service/service.proto\x12\x08marznode\"\x07\n\x05\x45mpty\"6\n\x07Inbound\x12\x0b\n\x03tag\x18\x01 \x01(\t\x12\x13\n\x06\x63onfig\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\t\n\x07_config\"7\n\x10InboundsResponse\x12#\n\x08inbounds\x18\x01 \x03(\x0b\x32\x11.marznode.Inbound\"1\n\x04User\x12\n\n\x02id\x18\x01 \x01(\r\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x0b\n\x03key\x18\x03 \x01(\t\"M\n\x08UserData\x12\x1c\n\x04user\x18\x01 \x01(\x0b\x32\x0e.marznode.User\x12#\n\x08inbounds\x18\x02 \x03(\x0b\x32\x11.marznode.Inbound\"3\n\tUsersData\x12&\n\nusers_data\x18\x01 \x03(\x0b\x32\x12.marznode.UserData\"j\n\nUsersStats\x12\x33\n\x0busers_stats\x18\x01 \x03(\x0b\x32\x1e.marznode.UsersStats.UserStats\x1a\'\n\tUserStats\x12\x0b\n\x03uid\x18\x01 \x01(\r\x12\r\n\x05usage\x18\x02 \x01(\x04\"\x17\n\x07LogLine\x12\x0c\n\x04line\x18\x01 \x01(\t\"#\n\nXrayConfig\x12\x15\n\rconfiguration\x18\x01 \x01(\t\")\n\x0fXrayLogsRequest\x12\x16\n\x0einclude_buffer\x18\x01 \x01(\x08\x32\xaf\x03\n\x0bMarzService\x12\x32\n\tSyncUsers\x12\x12.marznode.UserData\x1a\x0f.marznode.Empty(\x01\x12\x37\n\x0fRepopulateUsers\x12\x13.marznode.UsersData\x1a\x0f.marznode.Empty\x12<\n\rFetchInbounds\x12\x0f.marznode.Empty\x1a\x1a.marznode.InboundsResponse\x12\x38\n\x0f\x46\x65tchUsersStats\x12\x0f.marznode.Empty\x1a\x14.marznode.UsersStats\x12\x38\n\x0f\x46\x65tchXrayConfig\x12\x0f.marznode.Empty\x1a\x14.marznode.XrayConfig\x12?\n\x0bRestartXray\x12\x14.marznode.XrayConfig\x1a\x1a.marznode.InboundsResponse\x12@\n\x0eStreamXrayLogs\x12\x19.marznode.XrayLogsRequest\x1a\x11.marznode.LogLine0\x01\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1emarznode/service/service.proto\x12\x08marznode\"\x07\n\x05\x45mpty\"z\n\x07\x42\x61\x63kend\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x11\n\x04type\x18\x02 \x01(\tH\x00\x88\x01\x01\x12\x14\n\x07version\x18\x03 \x01(\tH\x01\x88\x01\x01\x12#\n\x08inbounds\x18\x04 \x03(\x0b\x32\x11.marznode.InboundB\x07\n\x05_typeB\n\n\x08_version\"7\n\x10\x42\x61\x63kendsResponse\x12#\n\x08\x62\x61\x63kends\x18\x01 \x03(\x0b\x32\x11.marznode.Backend\"6\n\x07Inbound\x12\x0b\n\x03tag\x18\x01 \x01(\t\x12\x13\n\x06\x63onfig\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\t\n\x07_config\"1\n\x04User\x12\n\n\x02id\x18\x01 \x01(\r\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x0b\n\x03key\x18\x03 \x01(\t\"M\n\x08UserData\x12\x1c\n\x04user\x18\x01 \x01(\x0b\x32\x0e.marznode.User\x12#\n\x08inbounds\x18\x02 \x03(\x0b\x32\x11.marznode.Inbound\"3\n\tUsersData\x12&\n\nusers_data\x18\x01 \x03(\x0b\x32\x12.marznode.UserData\"j\n\nUsersStats\x12\x33\n\x0busers_stats\x18\x01 \x03(\x0b\x32\x1e.marznode.UsersStats.UserStats\x1a\'\n\tUserStats\x12\x0b\n\x03uid\x18\x01 \x01(\r\x12\r\n\x05usage\x18\x02 \x01(\x04\"\x17\n\x07LogLine\x12\x0c\n\x04line\x18\x01 \x01(\t\"U\n\rBackendConfig\x12\x15\n\rconfiguration\x18\x01 \x01(\t\x12-\n\rconfig_format\x18\x02 \x01(\x0e\x32\x16.marznode.ConfigFormat\"B\n\x12\x42\x61\x63kendLogsRequest\x12\x14\n\x0c\x62\x61\x63kend_name\x18\x01 \x01(\t\x12\x16\n\x0einclude_buffer\x18\x02 \x01(\x08\"f\n\x15RestartBackendRequest\x12\x14\n\x0c\x62\x61\x63kend_name\x18\x01 \x01(\t\x12,\n\x06\x63onfig\x18\x02 \x01(\x0b\x32\x17.marznode.BackendConfigH\x00\x88\x01\x01\x42\t\n\x07_config\"\x1f\n\x0c\x42\x61\x63kendStats\x12\x0f\n\x07running\x18\x01 \x01(\x08*-\n\x0c\x43onfigFormat\x12\t\n\x05PLAIN\x10\x00\x12\x08\n\x04JSON\x10\x01\x12\x08\n\x04YAML\x10\x02\x32\xfe\x03\n\x0bMarzService\x12\x32\n\tSyncUsers\x12\x12.marznode.UserData\x1a\x0f.marznode.Empty(\x01\x12\x37\n\x0fRepopulateUsers\x12\x13.marznode.UsersData\x1a\x0f.marznode.Empty\x12<\n\rFetchBackends\x12\x0f.marznode.Empty\x1a\x1a.marznode.BackendsResponse\x12\x38\n\x0f\x46\x65tchUsersStats\x12\x0f.marznode.Empty\x1a\x14.marznode.UsersStats\x12@\n\x12\x46\x65tchBackendConfig\x12\x11.marznode.Backend\x1a\x17.marznode.BackendConfig\x12\x42\n\x0eRestartBackend\x12\x1f.marznode.RestartBackendRequest\x1a\x0f.marznode.Empty\x12\x46\n\x11StreamBackendLogs\x12\x1c.marznode.BackendLogsRequest\x1a\x11.marznode.LogLine0\x01\x12<\n\x0fGetBackendStats\x12\x11.marznode.Backend\x1a\x16.marznode.BackendStatsb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'marznode.service.service_pb2', _globals) -if _descriptor._USE_C_DESCRIPTORS == False: - DESCRIPTOR._options = None +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_CONFIGFORMAT']._serialized_start=898 + _globals['_CONFIGFORMAT']._serialized_end=943 _globals['_EMPTY']._serialized_start=44 _globals['_EMPTY']._serialized_end=51 - _globals['_INBOUND']._serialized_start=53 - _globals['_INBOUND']._serialized_end=107 - _globals['_INBOUNDSRESPONSE']._serialized_start=109 - _globals['_INBOUNDSRESPONSE']._serialized_end=164 - _globals['_USER']._serialized_start=166 - _globals['_USER']._serialized_end=215 - _globals['_USERDATA']._serialized_start=217 - _globals['_USERDATA']._serialized_end=294 - _globals['_USERSDATA']._serialized_start=296 - _globals['_USERSDATA']._serialized_end=347 - _globals['_USERSSTATS']._serialized_start=349 - _globals['_USERSSTATS']._serialized_end=455 - _globals['_USERSSTATS_USERSTATS']._serialized_start=416 - _globals['_USERSSTATS_USERSTATS']._serialized_end=455 - _globals['_LOGLINE']._serialized_start=457 - _globals['_LOGLINE']._serialized_end=480 - _globals['_XRAYCONFIG']._serialized_start=482 - _globals['_XRAYCONFIG']._serialized_end=517 - _globals['_XRAYLOGSREQUEST']._serialized_start=519 - _globals['_XRAYLOGSREQUEST']._serialized_end=560 - _globals['_MARZSERVICE']._serialized_start=563 - _globals['_MARZSERVICE']._serialized_end=994 + _globals['_BACKEND']._serialized_start=53 + _globals['_BACKEND']._serialized_end=175 + _globals['_BACKENDSRESPONSE']._serialized_start=177 + _globals['_BACKENDSRESPONSE']._serialized_end=232 + _globals['_INBOUND']._serialized_start=234 + _globals['_INBOUND']._serialized_end=288 + _globals['_USER']._serialized_start=290 + _globals['_USER']._serialized_end=339 + _globals['_USERDATA']._serialized_start=341 + _globals['_USERDATA']._serialized_end=418 + _globals['_USERSDATA']._serialized_start=420 + _globals['_USERSDATA']._serialized_end=471 + _globals['_USERSSTATS']._serialized_start=473 + _globals['_USERSSTATS']._serialized_end=579 + _globals['_USERSSTATS_USERSTATS']._serialized_start=540 + _globals['_USERSSTATS_USERSTATS']._serialized_end=579 + _globals['_LOGLINE']._serialized_start=581 + _globals['_LOGLINE']._serialized_end=604 + _globals['_BACKENDCONFIG']._serialized_start=606 + _globals['_BACKENDCONFIG']._serialized_end=691 + _globals['_BACKENDLOGSREQUEST']._serialized_start=693 + _globals['_BACKENDLOGSREQUEST']._serialized_end=759 + _globals['_RESTARTBACKENDREQUEST']._serialized_start=761 + _globals['_RESTARTBACKENDREQUEST']._serialized_end=863 + _globals['_BACKENDSTATS']._serialized_start=865 + _globals['_BACKENDSTATS']._serialized_end=896 + _globals['_MARZSERVICE']._serialized_start=946 + _globals['_MARZSERVICE']._serialized_end=1456 # @@protoc_insertion_point(module_scope) diff --git a/marznode/service/service_pb2.pyi b/marznode/service/service_pb2.pyi new file mode 100644 index 0000000..3348be7 --- /dev/null +++ b/marznode/service/service_pb2.pyi @@ -0,0 +1,119 @@ +from google.protobuf.internal import containers as _containers +from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union + +DESCRIPTOR: _descriptor.FileDescriptor + +class ConfigFormat(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = () + PLAIN: _ClassVar[ConfigFormat] + JSON: _ClassVar[ConfigFormat] + YAML: _ClassVar[ConfigFormat] +PLAIN: ConfigFormat +JSON: ConfigFormat +YAML: ConfigFormat + +class Empty(_message.Message): + __slots__ = () + def __init__(self) -> None: ... + +class Backend(_message.Message): + __slots__ = ("name", "type", "version", "inbounds") + NAME_FIELD_NUMBER: _ClassVar[int] + TYPE_FIELD_NUMBER: _ClassVar[int] + VERSION_FIELD_NUMBER: _ClassVar[int] + INBOUNDS_FIELD_NUMBER: _ClassVar[int] + name: str + type: str + version: str + inbounds: _containers.RepeatedCompositeFieldContainer[Inbound] + def __init__(self, name: _Optional[str] = ..., type: _Optional[str] = ..., version: _Optional[str] = ..., inbounds: _Optional[_Iterable[_Union[Inbound, _Mapping]]] = ...) -> None: ... + +class BackendsResponse(_message.Message): + __slots__ = ("backends",) + BACKENDS_FIELD_NUMBER: _ClassVar[int] + backends: _containers.RepeatedCompositeFieldContainer[Backend] + def __init__(self, backends: _Optional[_Iterable[_Union[Backend, _Mapping]]] = ...) -> None: ... + +class Inbound(_message.Message): + __slots__ = ("tag", "config") + TAG_FIELD_NUMBER: _ClassVar[int] + CONFIG_FIELD_NUMBER: _ClassVar[int] + tag: str + config: str + def __init__(self, tag: _Optional[str] = ..., config: _Optional[str] = ...) -> None: ... + +class User(_message.Message): + __slots__ = ("id", "username", "key") + ID_FIELD_NUMBER: _ClassVar[int] + USERNAME_FIELD_NUMBER: _ClassVar[int] + KEY_FIELD_NUMBER: _ClassVar[int] + id: int + username: str + key: str + def __init__(self, id: _Optional[int] = ..., username: _Optional[str] = ..., key: _Optional[str] = ...) -> None: ... + +class UserData(_message.Message): + __slots__ = ("user", "inbounds") + USER_FIELD_NUMBER: _ClassVar[int] + INBOUNDS_FIELD_NUMBER: _ClassVar[int] + user: User + inbounds: _containers.RepeatedCompositeFieldContainer[Inbound] + def __init__(self, user: _Optional[_Union[User, _Mapping]] = ..., inbounds: _Optional[_Iterable[_Union[Inbound, _Mapping]]] = ...) -> None: ... + +class UsersData(_message.Message): + __slots__ = ("users_data",) + USERS_DATA_FIELD_NUMBER: _ClassVar[int] + users_data: _containers.RepeatedCompositeFieldContainer[UserData] + def __init__(self, users_data: _Optional[_Iterable[_Union[UserData, _Mapping]]] = ...) -> None: ... + +class UsersStats(_message.Message): + __slots__ = ("users_stats",) + class UserStats(_message.Message): + __slots__ = ("uid", "usage") + UID_FIELD_NUMBER: _ClassVar[int] + USAGE_FIELD_NUMBER: _ClassVar[int] + uid: int + usage: int + def __init__(self, uid: _Optional[int] = ..., usage: _Optional[int] = ...) -> None: ... + USERS_STATS_FIELD_NUMBER: _ClassVar[int] + users_stats: _containers.RepeatedCompositeFieldContainer[UsersStats.UserStats] + def __init__(self, users_stats: _Optional[_Iterable[_Union[UsersStats.UserStats, _Mapping]]] = ...) -> None: ... + +class LogLine(_message.Message): + __slots__ = ("line",) + LINE_FIELD_NUMBER: _ClassVar[int] + line: str + def __init__(self, line: _Optional[str] = ...) -> None: ... + +class BackendConfig(_message.Message): + __slots__ = ("configuration", "config_format") + CONFIGURATION_FIELD_NUMBER: _ClassVar[int] + CONFIG_FORMAT_FIELD_NUMBER: _ClassVar[int] + configuration: str + config_format: ConfigFormat + def __init__(self, configuration: _Optional[str] = ..., config_format: _Optional[_Union[ConfigFormat, str]] = ...) -> None: ... + +class BackendLogsRequest(_message.Message): + __slots__ = ("backend_name", "include_buffer") + BACKEND_NAME_FIELD_NUMBER: _ClassVar[int] + INCLUDE_BUFFER_FIELD_NUMBER: _ClassVar[int] + backend_name: str + include_buffer: bool + def __init__(self, backend_name: _Optional[str] = ..., include_buffer: bool = ...) -> None: ... + +class RestartBackendRequest(_message.Message): + __slots__ = ("backend_name", "config") + BACKEND_NAME_FIELD_NUMBER: _ClassVar[int] + CONFIG_FIELD_NUMBER: _ClassVar[int] + backend_name: str + config: BackendConfig + def __init__(self, backend_name: _Optional[str] = ..., config: _Optional[_Union[BackendConfig, _Mapping]] = ...) -> None: ... + +class BackendStats(_message.Message): + __slots__ = ("running",) + RUNNING_FIELD_NUMBER: _ClassVar[int] + running: bool + def __init__(self, running: bool = ...) -> None: ... diff --git a/marznode/storage/memory.py b/marznode/storage/memory.py index 6a5ee49..792412f 100644 --- a/marznode/storage/memory.py +++ b/marznode/storage/memory.py @@ -48,6 +48,8 @@ def remove_inbound(self, inbound: Inbound | str) -> None: tag = inbound if isinstance(inbound, str) else inbound.tag if tag in self.storage["inbounds"]: self.storage["inbounds"].pop(tag) + for user_id, user in self.storage["users"].items(): + user.inbounds = list(filter(lambda inb: inb.tag != tag, user.inbounds)) async def flush_users(self): self.storage["users"] = {}