Skip to content

Commit

Permalink
Merge branch 'main' into feat/adding-channel-subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
germa89 authored Aug 8, 2024
2 parents 6d886c9 + 75f2b7e commit a51a318
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 48 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ fail_fast: True
repos:

- repo: https://github.com/ansys/pre-commit-hooks
rev: v0.4.2
rev: v0.4.3
hooks:
- id: add-license-headers
args:
Expand Down
1 change: 1 addition & 0 deletions doc/changelog.d/3319.miscellaneous.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
feat: adding more descriptive errors
1 change: 1 addition & 0 deletions doc/changelog.d/3339.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
build: update pre-commit-hook
10 changes: 7 additions & 3 deletions src/ansys/mapdl/core/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@
import grpc

from ansys.mapdl.core.errors import MapdlConnectionError

from ..mapdl_grpc import MapdlGrpc
from ansys.mapdl.core.mapdl_grpc import MAX_MESSAGE_LENGTH, MapdlGrpc

MINIMUM_MAPDL_VERSION = "21.1"
FAILING_DATABASE_MAPDL = ["24.1", "24.2"]
Expand Down Expand Up @@ -280,7 +279,12 @@ def start(self, timeout=10):
self._server = {"ip": self._ip, "port": db_port}
self._channel_str = f"{self._ip}:{db_port}"

self._channel = grpc.insecure_channel(self._channel_str)
self._channel = grpc.insecure_channel(
self._channel_str,
options=[
("grpc.max_receive_message_length", MAX_MESSAGE_LENGTH),
],
)
self._state = grpc.channel_ready_future(self._channel)
self._stub = mapdl_db_pb2_grpc.MapdlDbServiceStub(self._channel)

Expand Down
106 changes: 75 additions & 31 deletions src/ansys/mapdl/core/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@
# SOFTWARE.

"""PyMAPDL specific errors"""

from functools import wraps
import signal
import threading
from typing import Callable, Optional

from grpc._channel import _InactiveRpcError, _MultiThreadedRendezvous
import grpc

from ansys.mapdl.core import LOG as logger

Expand Down Expand Up @@ -270,6 +269,13 @@ def __init__(self, msg=""):
super().__init__(msg)


class MapdlgRPCError(MapdlRuntimeError):
"""Raised when gRPC issues are found"""

def __init__(self, msg=""):
super().__init__(msg)


# handler for protect_grpc
def handler(sig, frame): # pragma: no cover
"""Pass signal to custom interrupt handler."""
Expand Down Expand Up @@ -302,35 +308,26 @@ def wrapper(*args, **kwargs):
# Capture gRPC exceptions
try:
out = func(*args, **kwargs)
except (_InactiveRpcError, _MultiThreadedRendezvous) as error:
# can't use isinstance here due to circular imports
try:
class_name = args[0].__class__.__name__
except:
class_name = ""

# trying to get "cmd" argument:
cmd = args[1] if len(args) >= 2 else ""
cmd = kwargs.get("cmd", cmd)

caller = func.__name__

if cmd:
msg_ = f"running:\n{cmd}\ncalled by:\n{caller}"
else:
msg_ = f"calling:{caller}\nwith the following arguments:\nargs: {list(*args)}\nkwargs: {list(**kwargs_)}"

if class_name == "MapdlGrpc":
mapdl = args[0]
elif hasattr(args[0], "_mapdl"):
mapdl = args[0]._mapdl

# Must close unfinished processes
mapdl._close_process()
raise MapdlExitedError(
f"MAPDL server connection terminated unexpectedly while {msg_}\nwith the following error\n{error}"
) from None

except grpc.RpcError as error:
# Custom errors
if error.code() == grpc.StatusCode.RESOURCE_EXHAUSTED:
if "Received message larger than max" in error.details():
try:
lim_ = int(error.details().split("(")[1].split("vs")[0])
except IndexError:
lim_ = int(512 * 1024**2)

raise MapdlgRPCError(
f"RESOURCE_EXHAUSTED: {error.details()}. "
"You can try to increase the gRPC message length size using 'PYMAPDL_MAX_MESSAGE_LENGTH'"
" environment variable. For instance:\n\n"
f"$ export PYMAPDL_MAX_MESSAGE_LENGTH={lim_}"
)

# Generic error
handle_generic_grpc_error(error, func, args, kwargs)

# No exceptions
if threading.current_thread().__class__.__name__ == "_MainThread":
received_interrupt = bool(SIGINT_TRACKER)

Expand All @@ -347,6 +344,53 @@ def wrapper(*args, **kwargs):
return wrapper


def handle_generic_grpc_error(error, func, args, kwargs):
"""Handle non-custom gRPC errors"""

# can't use isinstance here due to circular imports
try:
class_name = args[0].__class__.__name__
except (IndexError, AttributeError):
class_name = ""

# trying to get "cmd" argument:
cmd = args[1] if len(args) >= 2 else ""
cmd = kwargs.get("cmd", cmd)

caller = func.__name__

if cmd:
msg_ = f"running:\n {cmd}\ncalled by:\n {caller}\n"
else:
msg_ = f"calling:{caller}\nwith the following arguments:\n args: {args}\n kwargs: {kwargs}"

if class_name == "MapdlGrpc":
mapdl = args[0]
elif hasattr(args[0], "_mapdl"):
mapdl = args[0]._mapdl

msg = (
f"Error:\nMAPDL server connection terminated unexpectedly while {msg_}\n"
"Error:\n"
f" {error.details()}\n"
f"Full error:\n{error}"
)

# MAPDL gRPC is unavailable.
if error.code() == grpc.StatusCode.UNAVAILABLE:
raise MapdlExitedError(msg)

# Generic error
# Test if MAPDL is alive or not.
if mapdl.is_alive:
raise MapdlRuntimeError(msg)

else:
# Must close unfinished processes
mapdl._close_process()
raise MapdlExitedError(msg)


def protect_from(
exception, match: Optional[str] = None, condition: Optional[bool] = None
) -> Callable:
Expand Down
2 changes: 1 addition & 1 deletion src/ansys/mapdl/core/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ def _get_parameter_array(self, parm_name, shape):
if not escaped: # pragma: no cover
raise MapdlRuntimeError(
f"The array '{parm_name}' has a number format "
"that could not be read using '{format_str}'."
f"that could not be read using '{format_str}'."
)

arr_flat = np.fromstring(output, sep="\n").reshape(shape)
Expand Down
8 changes: 8 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@
reason="Skipping because not on local. ",
)

skip_if_not_remote = pytest.mark.skipif(
ON_LOCAL,
reason="Skipping because not on remote. ",
)

skip_if_on_cicd = pytest.mark.skipif(
ON_CI,
reason="""Skip if on CI/CD.""",
Expand Down Expand Up @@ -146,6 +151,9 @@ def requires(requirement: str):
elif "local" == requirement:
return skip_if_not_local

elif "remote" == requirement:
return skip_if_not_remote

elif "cicd" == requirement:
return skip_if_on_cicd

Expand Down
86 changes: 74 additions & 12 deletions tests/test_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
from ansys.mapdl.core.errors import (
MapdlCommandIgnoredError,
MapdlExitedError,
MapdlgRPCError,
MapdlRuntimeError,
)
from ansys.mapdl.core.mapdl_grpc import MAX_MESSAGE_LENGTH, MapdlGrpc
from ansys.mapdl.core.misc import random_string

PATH = os.path.dirname(os.path.abspath(__file__))
Expand Down Expand Up @@ -101,20 +103,20 @@ def setup_for_cmatrix(mapdl, cleared):
mapdl.run("/solu")


def test_connect_via_channel(mapdl):
"""Validate MapdlGrpc can be created directly from a channel."""

import grpc

from ansys.mapdl.core.mapdl_grpc import MAX_MESSAGE_LENGTH, MapdlGrpc

@pytest.fixture(scope="function")
def grpc_channel(mapdl):
channel = grpc.insecure_channel(
mapdl._channel_str,
options=[
("grpc.max_receive_message_length", MAX_MESSAGE_LENGTH),
],
)
mapdl = MapdlGrpc(channel=channel)
return channel


def test_connect_via_channel(grpc_channel):
"""Validate MapdlGrpc can be created directly from a channel."""
mapdl = MapdlGrpc(channel=grpc_channel)
assert mapdl.is_alive


Expand All @@ -139,11 +141,14 @@ def test_clear_multiple(mapdl):
mapdl.run("/CLEAR")


@pytest.mark.xfail(
reason="MAPDL bug 867421", raises=(MapdlExitedError, UnicodeDecodeError)
)
def test_invalid_get_bug(mapdl):
with pytest.raises((MapdlRuntimeError, MapdlCommandIgnoredError)):
# versions before 24.1 should raise an error
if mapdl.version < 24.1:
context = pytest.raises(MapdlCommandIgnoredError)
else:
context = pytest.raises((MapdlRuntimeError, MapdlCommandIgnoredError))

with context:
mapdl.get_value("ACTIVE", item1="SET", it1num="invalid")


Expand Down Expand Up @@ -588,3 +593,60 @@ def test_subscribe_to_channel(mapdl):
grpc.ChannelConnectivity.TRANSIENT_FAILURE,
grpc.ChannelConnectivity.SHUTDOWN,
]


@requires("remote")
def test_exception_message_length(mapdl):
# This test does not fail if running on local
channel = grpc.insecure_channel(
mapdl._channel_str,
options=[
("grpc.max_receive_message_length", int(1024)),
],
)
mapdl2 = MapdlGrpc(channel=channel)
assert mapdl2.is_alive

mapdl2.prep7()
mapdl2.dim("myarr", "", 1e5)
mapdl2.vfill("myarr", "rand", 0, 1) # filling array with random numbers

# Retrieving
with pytest.raises(MapdlgRPCError, match="Received message larger than max"):
values = mapdl2.parameters["myarr"]

assert mapdl2.is_alive

# Deleting generated mapdl instance and channel.
channel.close()
mapdl2._exited = True # To avoid side effects.
mapdl2.exit()


def test_generic_grpc_exception(monkeypatch, grpc_channel):
mapdl = MapdlGrpc(channel=grpc_channel)
assert mapdl.is_alive

class UnavailableError(grpc.RpcError):
def __init__(self, message="Service is temporarily unavailable."):
self._message = message
self._code = grpc.StatusCode.UNAVAILABLE
super().__init__(message)

def code(self):
return self._code

def details(self):
return self._message

def _raise_error_code(args, **kwargs):
raise UnavailableError()

monkeypatch.setattr(mapdl._stub, "SendCommand", _raise_error_code)

with pytest.raises(
MapdlExitedError, match="MAPDL server connection terminated unexpectedly while"
):
mapdl.prep7()

assert mapdl.is_alive

0 comments on commit a51a318

Please sign in to comment.