Skip to content

Commit

Permalink
Async plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
smilingDima committed Jul 31, 2024
1 parent c80519e commit 9f5ee3e
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 2 deletions.
10 changes: 10 additions & 0 deletions src/zeep/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,16 @@ async def __aenter__(self):
async def __aexit__(self, exc_type=None, exc_value=None, traceback=None) -> None:
await self.transport.aclose()

async def create_message(self, service, operation_name, *args, **kwargs):
"""Create the payload for the given operation.
:rtype: lxml.etree._Element
"""
envelope, http_headers = await service._binding._create_async(
operation_name, args, kwargs, client=self
)
return envelope

class CachingClient(Client):
"""Shortcut to create a caching client, for the lazy people.
Expand Down
25 changes: 25 additions & 0 deletions src/zeep/plugins.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import typing
from collections import deque
from inspect import iscoroutinefunction


class Plugin:
Expand Down Expand Up @@ -37,6 +38,18 @@ def apply_egress(client, envelope, http_headers, operation, binding_options):
return envelope, http_headers


async def apply_egress_async(client, envelope, http_headers, operation, binding_options):
for plugin in client.plugins:
if iscoroutinefunction(plugin.egress):
result = await plugin.egress(envelope, http_headers, operation, binding_options)
else:
result = plugin.egress(envelope, http_headers, operation, binding_options)
if result is not None:
envelope, http_headers = result

return envelope, http_headers


def apply_ingress(client, envelope, http_headers, operation):
for plugin in client.plugins:
result = plugin.ingress(envelope, http_headers, operation)
Expand All @@ -46,6 +59,18 @@ def apply_ingress(client, envelope, http_headers, operation):
return envelope, http_headers


async def apply_ingress_async(client, envelope, http_headers, operation):
for plugin in client.plugins:
if iscoroutinefunction(plugin.egress):
result = await plugin.ingress(envelope, http_headers, operation)
else:
result = plugin.ingress(envelope, http_headers, operation)
if result is not None:
envelope, http_headers = result

return envelope, http_headers


class HistoryPlugin(Plugin):
def __init__(self, maxlen=1):
self._buffer = deque([], maxlen)
Expand Down
122 changes: 120 additions & 2 deletions src/zeep/wsdl/bindings/soap.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,52 @@ def _create(self, operation, args, kwargs, client=None, options=None):

return envelope, http_headers

async def _create_async(self, operation, args, kwargs, client=None, options=None):
"""Create the XML document to send to the server.
Note that this generates the soap envelope without the wsse applied.
"""
operation_obj = self.get(operation)
if not operation_obj:
raise ValueError("Operation %r not found" % operation)

# Create the SOAP envelope
serialized = operation_obj.create(*args, **kwargs)
self._set_http_headers(serialized, operation_obj)

envelope = serialized.content
http_headers = serialized.headers

# Apply ws-addressing
if client:
if not options:
options = client.service._binding_options

if operation_obj.abstract.wsa_action:
envelope, http_headers = wsa.WsAddressingPlugin().egress(
envelope, http_headers, operation_obj, options
)

# Apply plugins
envelope, http_headers = await plugins.apply_egress_async(
client, envelope, http_headers, operation_obj, options
)

# Apply WSSE
if client.wsse:
if isinstance(client.wsse, list):
for wsse in client.wsse:
envelope, http_headers = wsse.apply(envelope, http_headers)
else:
envelope, http_headers = client.wsse.apply(envelope, http_headers)

# Add extra http headers from the setings object
if client.settings.extra_http_headers:
http_headers.update(client.settings.extra_http_headers)

return envelope, http_headers

def send(self, client, options, operation, args, kwargs):
"""Called from the service
Expand Down Expand Up @@ -149,7 +195,7 @@ async def send_async(self, client, options, operation, args, kwargs):
:type kwargs: dict
"""
envelope, http_headers = self._create(
envelope, http_headers = await self._create_async(
operation, args, kwargs, client=client, options=options
)

Expand All @@ -161,7 +207,7 @@ async def send_async(self, client, options, operation, args, kwargs):
return response

operation_obj = self.get(operation)
return self.process_reply(client, operation_obj, response)
return await self.process_reply_async(client, operation_obj, response)

def process_reply(self, client, operation, response):
"""Process the XML reply from the server.
Expand Down Expand Up @@ -235,6 +281,78 @@ def process_reply(self, client, operation, response):
return message_pack
return result

async def process_reply_async(self, client, operation, response):
"""Process the XML reply from the server.
:param client: The client with which the operation was called
:type client: zeep.client.Client
:param operation: The operation object from which this is a reply
:type operation: zeep.wsdl.definitions.Operation
:param response: The response object returned by the remote server
:type response: requests.Response
"""
if response.status_code in (201, 202) and not response.content:
return None

elif response.status_code != 200 and not response.content:
raise TransportError(
"Server returned HTTP status %d (no content available)"
% response.status_code,
status_code=response.status_code,
)

content_type = response.headers.get("Content-Type", "text/xml")
media_type = get_media_type(content_type)
message_pack = None

# If the reply is a multipart/related then we need to retrieve all the
# parts
if media_type == "multipart/related":
decoder = MultipartDecoder(
response.content, content_type, response.encoding or "utf-8"
)
content = decoder.parts[0].content
if len(decoder.parts) > 1:
message_pack = MessagePack(parts=decoder.parts[1:])
else:
content = response.content

try:
doc = parse_xml(content, self.transport, settings=client.settings)
except XMLSyntaxError as exc:
raise TransportError(
"Server returned response (%s) with invalid XML: %s.\nContent: %r"
% (response.status_code, exc, response.content),
status_code=response.status_code,
content=response.content,
)

# Check if this is an XOP message which we need to decode first
if message_pack:
if process_xop(doc, message_pack):
message_pack = None

if client.wsse:
client.wsse.verify(doc)

doc, http_headers = await plugins.apply_ingress_async(
client, doc, response.headers, operation
)

# If the response code is not 200 or if there is a Fault node available
# then assume that an error occured.
fault_node = doc.find("soap-env:Body/soap-env:Fault", namespaces=self.nsmap)
if response.status_code != 200 or fault_node is not None:
return self.process_error(doc, operation)

result = operation.process_reply(doc)

if message_pack:
message_pack._set_root(result)
return message_pack
return result

def process_error(self, doc, operation):
raise NotImplementedError

Expand Down

0 comments on commit 9f5ee3e

Please sign in to comment.