diff --git a/marznode/service/service.proto b/marznode/service/service.proto index 31e636f..f7b1f64 100644 --- a/marznode/service/service.proto +++ b/marznode/service/service.proto @@ -44,6 +44,10 @@ message XrayConfig { string configuration = 1; } +message XrayLogsRequest { + bool include_buffer = 1; +} + service MarzService { rpc SyncUsers(stream UserData) returns (Empty); rpc RepopulateUsers(UsersData) returns (Empty); @@ -51,5 +55,5 @@ service MarzService { rpc FetchUsersStats(Empty) returns (UsersStats); rpc FetchXrayConfig(Empty) returns (XrayConfig); rpc RestartXray(XrayConfig) returns (InboundsResponse); - rpc StreamXrayLogs(Empty) returns(stream LogLine); + rpc StreamXrayLogs(XrayLogsRequest) returns(stream LogLine); } diff --git a/marznode/service/service.py b/marznode/service/service.py index bbc4410..ca10068 100644 --- a/marznode/service/service.py +++ b/marznode/service/service.py @@ -13,7 +13,7 @@ from marznode.xray_api.exceptions import EmailExistsError, EmailNotFoundError from marznode.xray_api.types.account import accounts_map from .service_grpc import MarzServiceBase -from .service_pb2 import UserData, Empty, InboundsResponse, Inbound, UsersStats +from .service_pb2 import UserData, Empty, InboundsResponse, Inbound, UsersStats, LogLine from .service_pb2 import XrayConfig as XrayConfig_pb2 from .. import config from ..xray.base import XrayCore @@ -130,12 +130,17 @@ async def FetchUsersStats(self, await stream.send_message(UsersStats(users_stats=user_stats)) async def StreamXrayLogs(self, - stream: 'grpclib.server.Stream[marznode.service.service_pb2.Empty,' - 'marznode.service.service_pb2.LogLine]') -> None: - pass - - async def FetchXrayConfig(self, stream: 'grpclib.server.Stream[marznode.service.service_pb2.Empty,' - 'marznode.service.service_pb2.XrayConfig]') -> None: + stream: Stream[Empty, LogLine]) -> None: + req = await stream.recv_message() + if req.include_buffer: + for line in self.xray.get_buffer(): + await stream.send_message(LogLine(line=line)) + log_stm = await self.xray.get_logs_stm() + async with log_stm: + async for line in log_stm: + await stream.send_message(LogLine(line=line)) + + async def FetchXrayConfig(self, stream: Stream[Empty, XrayConfig_pb2]) -> None: await stream.recv_message() with open(config.XRAY_CONFIG_PATH, 'r') as f: content = f.read() diff --git a/marznode/service/service_grpc.py b/marznode/service/service_grpc.py index b01dfc4..3853d9e 100644 --- a/marznode/service/service_grpc.py +++ b/marznode/service/service_grpc.py @@ -39,7 +39,7 @@ async def RestartXray(self, stream: 'grpclib.server.Stream[marznode.service.serv pass @abc.abstractmethod - async def StreamXrayLogs(self, stream: 'grpclib.server.Stream[marznode.service.service_pb2.Empty, marznode.service.service_pb2.LogLine]') -> None: + async def StreamXrayLogs(self, stream: 'grpclib.server.Stream[marznode.service.service_pb2.XrayLogsRequest, marznode.service.service_pb2.LogLine]') -> None: pass def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]: @@ -83,7 +83,7 @@ def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]: '/marznode.MarzService/StreamXrayLogs': grpclib.const.Handler( self.StreamXrayLogs, grpclib.const.Cardinality.UNARY_STREAM, - marznode.service.service_pb2.Empty, + marznode.service.service_pb2.XrayLogsRequest, marznode.service.service_pb2.LogLine, ), } @@ -131,6 +131,6 @@ def __init__(self, channel: grpclib.client.Channel) -> None: self.StreamXrayLogs = grpclib.client.UnaryStreamMethod( channel, '/marznode.MarzService/StreamXrayLogs', - marznode.service.service_pb2.Empty, + marznode.service.service_pb2.XrayLogsRequest, marznode.service.service_pb2.LogLine, ) diff --git a/marznode/service/service_pb2.py b/marznode/service/service_pb2.py index 7dd7f59..62a5f87 100644 --- a/marznode/service/service_pb2.py +++ b/marznode/service/service_pb2.py @@ -14,7 +14,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1emarznode/service/service.proto\x12\x08marznode\"\x07\n\x05\x45mpty\"6\n\x07Inbound\x12\x0b\n\x03tag\x18\x01 \x01(\t\x12\x13\n\x06\x63onfig\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\t\n\x07_config\"7\n\x10InboundsResponse\x12#\n\x08inbounds\x18\x01 \x03(\x0b\x32\x11.marznode.Inbound\"1\n\x04User\x12\n\n\x02id\x18\x01 \x01(\r\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x0b\n\x03key\x18\x03 \x01(\t\"M\n\x08UserData\x12\x1c\n\x04user\x18\x01 \x01(\x0b\x32\x0e.marznode.User\x12#\n\x08inbounds\x18\x02 \x03(\x0b\x32\x11.marznode.Inbound\"3\n\tUsersData\x12&\n\nusers_data\x18\x01 \x03(\x0b\x32\x12.marznode.UserData\"j\n\nUsersStats\x12\x33\n\x0busers_stats\x18\x01 \x03(\x0b\x32\x1e.marznode.UsersStats.UserStats\x1a\'\n\tUserStats\x12\x0b\n\x03uid\x18\x01 \x01(\r\x12\r\n\x05usage\x18\x02 \x01(\x04\"\x17\n\x07LogLine\x12\x0c\n\x04line\x18\x01 \x01(\t\"#\n\nXrayConfig\x12\x15\n\rconfiguration\x18\x01 \x01(\t2\xa5\x03\n\x0bMarzService\x12\x32\n\tSyncUsers\x12\x12.marznode.UserData\x1a\x0f.marznode.Empty(\x01\x12\x37\n\x0fRepopulateUsers\x12\x13.marznode.UsersData\x1a\x0f.marznode.Empty\x12<\n\rFetchInbounds\x12\x0f.marznode.Empty\x1a\x1a.marznode.InboundsResponse\x12\x38\n\x0f\x46\x65tchUsersStats\x12\x0f.marznode.Empty\x1a\x14.marznode.UsersStats\x12\x38\n\x0f\x46\x65tchXrayConfig\x12\x0f.marznode.Empty\x1a\x14.marznode.XrayConfig\x12?\n\x0bRestartXray\x12\x14.marznode.XrayConfig\x1a\x1a.marznode.InboundsResponse\x12\x36\n\x0eStreamXrayLogs\x12\x0f.marznode.Empty\x1a\x11.marznode.LogLine0\x01\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1emarznode/service/service.proto\x12\x08marznode\"\x07\n\x05\x45mpty\"6\n\x07Inbound\x12\x0b\n\x03tag\x18\x01 \x01(\t\x12\x13\n\x06\x63onfig\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\t\n\x07_config\"7\n\x10InboundsResponse\x12#\n\x08inbounds\x18\x01 \x03(\x0b\x32\x11.marznode.Inbound\"1\n\x04User\x12\n\n\x02id\x18\x01 \x01(\r\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x0b\n\x03key\x18\x03 \x01(\t\"M\n\x08UserData\x12\x1c\n\x04user\x18\x01 \x01(\x0b\x32\x0e.marznode.User\x12#\n\x08inbounds\x18\x02 \x03(\x0b\x32\x11.marznode.Inbound\"3\n\tUsersData\x12&\n\nusers_data\x18\x01 \x03(\x0b\x32\x12.marznode.UserData\"j\n\nUsersStats\x12\x33\n\x0busers_stats\x18\x01 \x03(\x0b\x32\x1e.marznode.UsersStats.UserStats\x1a\'\n\tUserStats\x12\x0b\n\x03uid\x18\x01 \x01(\r\x12\r\n\x05usage\x18\x02 \x01(\x04\"\x17\n\x07LogLine\x12\x0c\n\x04line\x18\x01 \x01(\t\"#\n\nXrayConfig\x12\x15\n\rconfiguration\x18\x01 \x01(\t\")\n\x0fXrayLogsRequest\x12\x16\n\x0einclude_buffer\x18\x01 \x01(\x08\x32\xaf\x03\n\x0bMarzService\x12\x32\n\tSyncUsers\x12\x12.marznode.UserData\x1a\x0f.marznode.Empty(\x01\x12\x37\n\x0fRepopulateUsers\x12\x13.marznode.UsersData\x1a\x0f.marznode.Empty\x12<\n\rFetchInbounds\x12\x0f.marznode.Empty\x1a\x1a.marznode.InboundsResponse\x12\x38\n\x0f\x46\x65tchUsersStats\x12\x0f.marznode.Empty\x1a\x14.marznode.UsersStats\x12\x38\n\x0f\x46\x65tchXrayConfig\x12\x0f.marznode.Empty\x1a\x14.marznode.XrayConfig\x12?\n\x0bRestartXray\x12\x14.marznode.XrayConfig\x1a\x1a.marznode.InboundsResponse\x12@\n\x0eStreamXrayLogs\x12\x19.marznode.XrayLogsRequest\x1a\x11.marznode.LogLine0\x01\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -41,6 +41,8 @@ _globals['_LOGLINE']._serialized_end=480 _globals['_XRAYCONFIG']._serialized_start=482 _globals['_XRAYCONFIG']._serialized_end=517 - _globals['_MARZSERVICE']._serialized_start=520 - _globals['_MARZSERVICE']._serialized_end=941 + _globals['_XRAYLOGSREQUEST']._serialized_start=519 + _globals['_XRAYLOGSREQUEST']._serialized_end=560 + _globals['_MARZSERVICE']._serialized_start=563 + _globals['_MARZSERVICE']._serialized_end=994 # @@protoc_insertion_point(module_scope)