Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate JsonRPCServer.start_tcp and JsonRPCServer.start_ws to high level asyncio APIs #507

Merged
merged 6 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/source/howto/migrate-to-v2.rst
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,22 @@ If you need to access the underlying protocol object this is now via the ``proto

pygls' base server class has been renamed

Removed ``loop`` argument from ``pygls.server.JsonRPCServer``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Servers and clients in pygls v2 now both use the high level asyncio API, removing the need for an explicit ``loop`` argument to be passed in.
If you need control over the event loop used by pygls you can use functions like :external:py:function:`asyncio.set_event_loop` before starting the server/client.

Removed ``multiprocessing.pool.ThreadPool``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The :external:py:class:`multiprocessing.pool.ThreadPool` instance has been removed, *pygls* now makes use of :external:py:class:`concurrent.futures.ThreadPoolExecutor` for all threaded tasks.

The ``thread_pool_executor`` attribute of the base ``JsonRPCServer`` class has been removed, the ``ThreadPoolExecutor`` can be accessed via the ``thread_pool`` attribute instead.

New ``pygls.io_`` module
^^^^^^^^^^^^^^^^^^^^^^^^

There is a new ``pygls.io_`` module containing main message parsing loop code common to both client and server

- The equivlaent to pygls v1's ``pygls.server.aio_readline`` function is now ``pygls.io_.run_async``
50 changes: 14 additions & 36 deletions pygls/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@
from __future__ import annotations

import asyncio
import json
import logging
import sys
import typing
from threading import Event

from pygls.exceptions import JsonRpcException, JsonRpcInternalError, PyglsError
from pygls.io_ import run_async
from pygls.exceptions import JsonRpcException, PyglsError
from pygls.io_ import run_async, run_websocket
from pygls.protocol import JsonRPCProtocol, default_converter
from pygls.server import WebSocketTransportAdapter

if typing.TYPE_CHECKING:
from typing import Any
Expand All @@ -37,8 +35,6 @@

from cattrs import Converter

from websockets.asyncio.client import ClientConnection

logger = logging.getLogger(__name__)


Expand All @@ -57,7 +53,7 @@ def __init__(

self._server: Optional[asyncio.subprocess.Process] = None
self._stop_event = Event()
self._async_tasks: List[asyncio.Task] = []
self._async_tasks: List[asyncio.Task[Any]] = []

@property
def stopped(self) -> bool:
Expand Down Expand Up @@ -148,37 +144,19 @@ async def start_ws(self, host: str, port: int):

uri = f"ws://{host}:{port}"
websocket = await connect(uri)

self.protocol._send_only_body = True
self.protocol.connection_made(WebSocketTransportAdapter(websocket)) # type: ignore

connection = asyncio.create_task(self.run_websocket(websocket))
self._async_tasks.extend([connection])

async def run_websocket(self, websocket: ClientConnection):
"""Run the main message processing loop, over websockets."""

try:
from websockets.exceptions import ConnectionClosedOK
except ImportError:
logger.exception(
"Run `pip install pygls[ws]` to install dependencies required for websockets."
connection = asyncio.create_task(
run_websocket(
stop_event=self._stop_event,
websocket=websocket,
protocol=self.protocol,
logger=logger,
error_handler=self.report_server_error,
)
return
)
self._async_tasks.extend([connection])

while not self._stop_event.is_set():
try:
data = await websocket.recv(decode=False)
except ConnectionClosedOK:
self._stop_event.set()
break

try:
message = json.loads(data, object_hook=self.protocol.structure_message)
self.protocol.handle_message(message)
except Exception as exc:
logger.exception("Unable to handle message")
self._report_server_error(exc, JsonRpcInternalError)
# Yield control to the event loop, gives the run_websocket task chance to spin up.
await asyncio.sleep(0)

async def _server_exit(self):
"""Cleanup handler that runs when the server process managed by the client exits"""
Expand Down
87 changes: 87 additions & 0 deletions pygls/io_.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
from concurrent.futures import ThreadPoolExecutor
from typing import Any, BinaryIO, Callable, Protocol

from websockets.asyncio.client import ClientConnection
from websockets.asyncio.server import ServerConnection

from pygls.protocol import JsonRPCProtocol

class Reader(Protocol):
Expand Down Expand Up @@ -70,6 +73,30 @@ def readexactly(self, n: int) -> Awaitable[bytes]:
return self.loop.run_in_executor(self.executor, self.stdin.read, n)


class WebSocketTransportAdapter:
"""Protocol adapter which calls write method.

Write method sends data via the WebSocket interface.
"""

def __init__(self, ws: ServerConnection | ClientConnection):
self._ws = ws
self._loop: asyncio.AbstractEventLoop | None = None

@property
def loop(self):
if self._loop is None:
self._loop = asyncio.get_running_loop()

return self._loop

def close(self) -> None:
asyncio.ensure_future(self._ws.close())

def write(self, data: Any) -> None:
asyncio.ensure_future(self._ws.send(data))


async def run_async(
stop_event: threading.Event,
reader: AsyncReader,
Expand Down Expand Up @@ -191,3 +218,63 @@ def run(
finally:
# Reset
content_length = 0


async def run_websocket(
websocket: ClientConnection | ServerConnection,
stop_event: threading.Event,
protocol: JsonRPCProtocol,
logger: logging.Logger | None = None,
error_handler: Callable[[Exception, type[JsonRpcException]], Any] | None = None,
):
"""Run the main message processing loop, over websockets.

Parameters
----------
stop_event
A ``threading.Event`` used to break the main loop

websocket
The websocket to read messages from

protocol
The protocol instance that should handle the messages

logger
The logger instance to use

error_handler
Function to call when an error is encountered.
"""

logger = logger or logging.getLogger(__name__)
protocol._send_only_body = True # Don't send headers within the payload
protocol.connection_made(WebSocketTransportAdapter(websocket)) # type: ignore

try:
from websockets.exceptions import ConnectionClosed
except ImportError:
logger.exception(
"Run `pip install pygls[ws]` to install dependencies required for websockets."
)
return

while not stop_event.is_set():
try:
logger.debug("waiting for a message...")
data = await websocket.recv(decode=False)
except ConnectionClosed:
logger.debug("Websocket connection closed.")
stop_event.set()
break

try:
message = json.loads(data, object_hook=protocol.structure_message)
protocol.handle_message(message)
except Exception as exc:
logger.exception("Unable to handle message")
if error_handler:
error_handler(exc, JsonRpcException)

logger.debug("Exiting main loop")
await websocket.close()
14 changes: 8 additions & 6 deletions pygls/protocol/json_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,29 @@
# limitations under the License. #
############################################################################
from __future__ import annotations

import asyncio
import enum
import json
import logging
import re
import sys
import uuid
import traceback
import uuid
from concurrent.futures import Future
from functools import partial
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Optional,
Type,
Union,
TYPE_CHECKING,
)

import attrs
from cattrs.errors import ClassValidationError

from lsprotocol.types import (
CANCEL_REQUEST,
EXIT,
Expand All @@ -47,19 +47,21 @@
)

from pygls.exceptions import (
FeatureNotificationError,
FeatureRequestError,
JsonRpcException,
JsonRpcInternalError,
JsonRpcInvalidParams,
JsonRpcMethodNotFound,
JsonRpcRequestCancelled,
FeatureNotificationError,
FeatureRequestError,
)
from pygls.feature_manager import FeatureManager, is_thread_function

if TYPE_CHECKING:
from cattrs import Converter
from pygls.server import JsonRPCServer, WebSocketTransportAdapter

from pygls.io_ import WebSocketTransportAdapter
from pygls.server import JsonRPCServer


logger = logging.getLogger(__name__)
Expand Down
3 changes: 0 additions & 3 deletions pygls/protocol/language_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import json
import logging
import sys
import typing
from functools import lru_cache
from itertools import zip_longest
Expand Down Expand Up @@ -117,8 +116,6 @@ def lsp_exit(self, *args) -> None:
if self.transport is not None:
self.transport.close()

sys.exit(0 if self._shutdown else 1)

@lsp_method(types.INITIALIZE)
def lsp_initialize(self, params: types.InitializeParams) -> types.InitializeResult:
"""Method that initializes language server.
Expand Down
Loading
Loading