Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add mechanism to unblock dsgsessioon poll messages method when a PyEnSight gRPC request is incoming #505

Merged
merged 1 commit into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/ansys/pyensight/core/ensight_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@
import sys
import tempfile
import threading
from typing import Any, Callable, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Union
import uuid

from ansys.api.pyensight.v0 import dynamic_scene_graph_pb2_grpc, ensight_pb2, ensight_pb2_grpc
import grpc

if TYPE_CHECKING:
from ansys.pyensight.core.utils.dsg_server import DSGSession


class EnSightGRPC(object):
"""Wrapper around a gRPC connection to an EnSight instance
Expand Down Expand Up @@ -60,6 +63,10 @@ def __init__(self, host: str = "127.0.0.1", port: int = 12345, secret_key: str =
self._image = None
self._image_number = 0
self._sub_service = None
self._dsg_session: Optional["DSGSession"] = None

def set_dsg_session(self, dsg_session: "DSGSession"):
self._dsg_session = dsg_session

@property
def host(self) -> str:
Expand Down
20 changes: 20 additions & 0 deletions src/ansys/pyensight/core/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from ansys.api.pyensight import ensight_api
from ansys.pyensight.core import enscontext, ensight_grpc, renderable
from ansys.pyensight.core.ensobj import ENSOBJ
from ansys.pyensight.core.utils.dsg_server import DSGSession


class InvalidEnSightVersion(Exception):
Expand Down Expand Up @@ -140,6 +141,7 @@ def __init__(
webui_port: Optional[int] = None,
) -> None:
# every session instance needs a unique name that can be used as a cache key
self._dsg_session: Optional["DSGSession"] = None
self._session_name = str(uuid.uuid1())
# when objects come into play, we can reuse them, so hash ID to instance here
self._ensobj_hash: Dict[int, "ENSOBJ"] = {}
Expand Down Expand Up @@ -962,8 +964,12 @@ def cmd(self, value: str, do_eval: bool = True) -> Any:
>>> print(session.cmd("10+4"))
14
"""
if self._dsg_session:
self._dsg_session._pyensight_grpc_coming = True
self._establish_connection()
ret = self._grpc.command(value, do_eval=do_eval)
if self._dsg_session:
self._dsg_session._pyensight_grpc_coming = False
if do_eval:
ret = self._convert_ctor(ret)
value = eval(ret, dict(session=self, ensobjlist=ensobjlist))
Expand Down Expand Up @@ -1820,3 +1826,17 @@ def find_remote_unused_ports(
cmd += f"ports = find_unused_ports({count}, start={start}, end={end}, avoid={avoid})"
self.cmd(cmd, do_eval=False)
return self.cmd("ports")

def set_dsg_session(self, dsg_session: "DSGSession"):
"""Set a DSG Session for the current PyEnSight session.

This is required if a DSGSession is running together with
PyEnSight and the second might send gRPC requests while the first
is blocked because the gRPC queue is full.

Parameters
----------
dsg_session: DSGSession
a DSGSession object
"""
self._dsg_session = dsg_session
19 changes: 17 additions & 2 deletions src/ansys/pyensight/core/utils/dsg_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@
import sys
import threading
import time
from typing import Any, Dict, List, Optional
from typing import TYPE_CHECKING, Any, Dict, List, Optional

from ansys.api.pyensight.v0 import dynamic_scene_graph_pb2
from ansys.pyensight.core import ensight_grpc
import numpy

if TYPE_CHECKING:
from ansys.pyensight.core import Session


class Part(object):
def __init__(self, session: "DSGSession"):
Expand Down Expand Up @@ -640,6 +643,7 @@ def __init__(
vrmode: bool = False,
time_scale: float = 1.0,
handler: UpdateHandler = UpdateHandler(),
session: Optional["Session"] = None,
):
"""
Manage a gRPC connection and link it to an UpdateHandler instance
Expand Down Expand Up @@ -678,6 +682,9 @@ def __init__(
"""
super().__init__()
self._grpc = ensight_grpc.EnSightGRPC(port=port, host=host, secret_key=security_code)
self._session = session
if self._session:
self._session.set_dsg_session(self)
self._callback_handler = handler
self._verbose = verbose
self._thread: Optional[threading.Thread] = None
Expand Down Expand Up @@ -705,6 +712,7 @@ def __init__(
# log any status changes to this file. external apps will be monitoring
self._status_file = os.environ.get("ANSYS_OV_SERVER_STATUS_FILENAME", "")
self._status = dict(status="idle", start_time=0.0, processed_buffers=0, total_buffers=0)
self._pyensight_grpc_coming = False

@property
def scene_bounds(self) -> Optional[List]:
Expand Down Expand Up @@ -892,6 +900,13 @@ def request_an_update(self, animation: bool = False, allow_spontaneous: bool = T
cmd.init.maximum_chunk_size = 1024 * 1024
self._dsg_queue.put(cmd) # type:ignore

def _is_queue_full(self):
if not self.max_dsg_queue_size:
return False
if self._pyensight_grpc_coming:
return False
return self._message_queue.qsize() >= self.max_dsg_queue_size

def _poll_messages(self) -> None:
"""Core interface to grab DSG events from gRPC and queue them for processing

Expand All @@ -905,7 +920,7 @@ def _poll_messages(self) -> None:
# if the queue is getting too deep, wait a bit to avoid holding too
# many messages (filling up memory)
if self.max_dsg_queue_size:
while self._message_queue.qsize() >= self.max_dsg_queue_size:
while self._is_queue_full():
time.sleep(0.001)
except Exception:
self._shutdown = True
Expand Down
Loading