Skip to content

Commit

Permalink
Merge pull request #683 from microsoftgraph/shem/batch_requests
Browse files Browse the repository at this point in the history
Batch Requests
  • Loading branch information
shemogumbe authored Sep 19, 2024
2 parents b917d8c + 378318f commit 99e88a0
Show file tree
Hide file tree
Showing 13 changed files with 1,414 additions and 0 deletions.
Empty file.
144 changes: 144 additions & 0 deletions src/msgraph_core/requests/batch_request_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
from typing import TypeVar, Type, Dict, Optional, Union
import logging

from kiota_abstractions.request_adapter import RequestAdapter
from kiota_abstractions.request_information import RequestInformation
from kiota_abstractions.method import Method
from kiota_abstractions.serialization import Parsable
from kiota_abstractions.headers_collection import HeadersCollection
from kiota_abstractions.api_error import APIError

from .batch_request_content import BatchRequestContent
from .batch_request_content_collection import BatchRequestContentCollection
from .batch_response_content import BatchResponseContent
from .batch_response_content_collection import BatchResponseContentCollection

T = TypeVar('T', bound='Parsable')

APPLICATION_JSON = "application/json"


class BatchRequestBuilder:
"""
Provides operations to call the batch method.
"""

def __init__(
self,
request_adapter: RequestAdapter,
error_map: Optional[Dict[str, Type[Parsable]]] = None
):
if request_adapter is None:
raise ValueError("request_adapter cannot be Null.")
self._request_adapter = request_adapter
self.url_template = f"{self._request_adapter.base_url}/$batch"
self.error_map = error_map or {}

async def post(
self,
batch_request_content: Union[BatchRequestContent, BatchRequestContentCollection],
error_map: Optional[Dict[str, Type[Parsable]]] = None,
) -> Union[T, BatchResponseContentCollection]:
"""
Sends a batch request and returns the batch response content.
Args:
batch_request_content (Union[BatchRequestContent,
BatchRequestContentCollection]): The batch request content.
response_type: Optional[Type[T]] : The type to deserialize the response into.
Optional[Dict[str, Type[Parsable]]] = None:
Error mappings for response handling.
Returns:
Union[T, BatchResponseContentCollection]: The batch response content
or the specified response type.
"""
if batch_request_content is None:
raise ValueError("batch_request_content cannot be Null.")
response_type = BatchResponseContent

if isinstance(batch_request_content, BatchRequestContent):
request_info = await self.to_post_request_information(batch_request_content)
bytes_content = request_info.content
json_content = bytes_content.decode("utf-8")
updated_str = '{"requests":' + json_content + '}'
updated_bytes = updated_str.encode("utf-8")
request_info.content = updated_bytes
error_map = error_map or self.error_map
response = None
try:
response = await self._request_adapter.send_async(
request_info, response_type, error_map
)

except APIError as e:
logging.error("API Error: %s", e)
raise e
if response is None:
raise ValueError("Failed to get a valid response from the API.")
return response
if isinstance(batch_request_content, BatchRequestContentCollection):
batch_responses = await self._post_batch_collection(batch_request_content, error_map)
return batch_responses

raise ValueError("Invalid type for batch_request_content.")

async def _post_batch_collection(
self,
batch_request_content_collection: BatchRequestContentCollection,
error_map: Optional[Dict[str, Type[Parsable]]] = None,
) -> BatchResponseContentCollection:
"""
Sends a collection of batch requests and returns a collection of batch response contents.
Args:
batch_request_content_collection (BatchRequestContentCollection): The
collection of batch request contents.
Optional[Dict[str, Type[Parsable]]] = None:
Error mappings for response handling.
Returns:
BatchResponseContentCollection: The collection of batch response contents.
"""
if batch_request_content_collection is None:
raise ValueError("batch_request_content_collection cannot be Null.")

batch_responses = BatchResponseContentCollection()

for batch_request_content in batch_request_content_collection.batches:
request_info = await self.to_post_request_information(batch_request_content)
response = await self._request_adapter.send_async(
request_info, BatchResponseContent, error_map or self.error_map
)
batch_responses.add_response(response)

return batch_responses

async def to_post_request_information(
self, batch_request_content: BatchRequestContent
) -> RequestInformation:
"""
Creates request information for a batch POST request.
Args:
batch_request_content (BatchRequestContent): The batch request content.
Returns:
RequestInformation: The request information.
"""

if batch_request_content is None:
raise ValueError("batch_request_content cannot be Null.")
batch_request_items = list(batch_request_content.requests.values())

request_info = RequestInformation()
request_info.http_method = Method.POST
request_info.url_template = self.url_template
request_info.headers = HeadersCollection()
request_info.headers.try_add("Content-Type", APPLICATION_JSON)
request_info.set_content_from_parsable(
self._request_adapter, APPLICATION_JSON, batch_request_items
)

return request_info
140 changes: 140 additions & 0 deletions src/msgraph_core/requests/batch_request_content.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import uuid
from typing import List, Dict, Union, Optional

from kiota_abstractions.request_information import RequestInformation
from kiota_abstractions.serialization import Parsable, ParseNode
from kiota_abstractions.serialization import SerializationWriter

from .batch_request_item import BatchRequestItem


class BatchRequestContent(Parsable):
"""
Provides operations to call the batch method.
"""

MAX_REQUESTS = 20

def __init__(self, requests: Dict[str, Union['BatchRequestItem', 'RequestInformation']] = {}):
"""
Initializes a new instance of the BatchRequestContent class.
"""
self._requests: Dict[str, Union[BatchRequestItem, 'RequestInformation']] = requests or {}

self.is_finalized = False
for request_id, request in requests.items():
self.add_request(request_id, request)

@property
def requests(self) -> Dict:
"""
Gets the requests.
"""
return self._requests

@requests.setter
def requests(self, requests: List[BatchRequestItem]) -> None:
"""
Sets the requests.
"""
if len(requests) >= BatchRequestContent.MAX_REQUESTS:
raise ValueError(f"Maximum number of requests is {BatchRequestContent.MAX_REQUESTS}")
for request in requests:
self.add_request(request.id, request)

def add_request(self, request_id: Optional[str], request: BatchRequestItem) -> None:
"""
Adds a request to the batch request content.
"""
if len(self.requests) >= BatchRequestContent.MAX_REQUESTS:
raise RuntimeError(f"Maximum number of requests is {BatchRequestContent.MAX_REQUESTS}")
if not request.id:
request.id = str(uuid.uuid4())
if hasattr(request, 'depends_on') and request.depends_on:
for dependent_id in request.depends_on:
if dependent_id not in [req.id for req in self.requests]:
dependent_request = self._request_by_id(dependent_id)
if dependent_request:
self._requests[dependent_id] = dependent_request
self._requests[request.id] = request

def add_request_information(self, request_information: RequestInformation) -> None:
"""
Adds a request to the batch request content.
Args:
request_information (RequestInformation): The request information to add.
"""
request_id = str(uuid.uuid4())
self.add_request(request_id, BatchRequestItem(request_information))

def add_urllib_request(self, request) -> None:
"""
Adds a request to the batch request content.
"""
request_id = str(uuid.uuid4())
self.add_request(request_id, BatchRequestItem.create_with_urllib_request(request))

def remove(self, request_id: str) -> None:
"""
Removes a request from the batch request content.
Also removes the request from the depends_on list of
other requests.
"""
request_to_remove = None
for request in self.requests:
if request.id == request_id:
request_to_remove = request
if hasattr(request, 'depends_on') and request.depends_on:
if request_id in request.depends_on:
request.depends_on.remove(request_id)
if request_to_remove:
del self._requests[request_to_remove.id]
else:
raise ValueError(f"Request ID {request_id} not found in requests.")

def remove_batch_request_item(self, item: BatchRequestItem) -> None:
"""
Removes a request from the batch request content.
"""
self.remove(item.id)

def finalize(self):
"""
Finalizes the batch request content.
"""
self.is_finalized = True
return self._requests

def _request_by_id(self, request_id: str) -> Optional[BatchRequestItem]:
"""
Finds a request by its ID.
Args:
request_id (str): The ID of the request to find.
Returns:
The request with the given ID, or None if not found.
"""
return self._requests.get(request_id)

@staticmethod
def create_from_discriminator_value(
parse_node: Optional[ParseNode] = None
) -> 'BatchRequestContent':
if parse_node is None:
raise ValueError("parse_node cannot be None")
return BatchRequestContent()

def get_field_deserializers(self, ) -> Dict:
"""
The deserialization information for the current model
"""
return {}

def serialize(self, writer: SerializationWriter) -> None:
"""
Serializes information the current object
Args:
writer: Serialization writer to use to serialize this model
"""
writer.write_collection_of_object_values("requests", self.requests)
84 changes: 84 additions & 0 deletions src/msgraph_core/requests/batch_request_content_collection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from typing import List, Optional

from kiota_abstractions.request_information import RequestInformation
from kiota_abstractions.serialization import SerializationWriter

from .batch_request_content import BatchRequestContent
from .batch_request_item import BatchRequestItem


class BatchRequestContentCollection:
"""A collection of request content objects."""

def __init__(self) -> None:
"""
Initializes a new instance of the BatchRequestContentCollection class.
"""
self.max_requests_per_batch = BatchRequestContent.MAX_REQUESTS
self.batches: List[BatchRequestContent] = []
self.current_batch: BatchRequestContent = BatchRequestContent()

def add_batch_request_item(self, request: BatchRequestItem) -> None:
"""
Adds a request item to the collection.
Args:
request (BatchRequestItem): The request item to add.
"""
if len(self.current_batch.requests) >= self.max_requests_per_batch:
self.batches.append(self.current_batch.finalize())
self.current_batch = BatchRequestContent()
self.current_batch.add_request(request.id, request)
self.batches.append(self.current_batch)

def remove_batch_request_item(self, request_id: str) -> None:
"""
Removes a request item from the collection.
Args:
request_id (str): The ID of the request item to remove.
"""
for batch in self.batches:
if request_id in batch.requests:
del batch.requests[request_id]
return
if request_id in self.current_batch.requests:
del self.current_batch.requests[request_id]

def new_batch_with_failed_requests(self) -> Optional[BatchRequestContent]:
"""
Creates a new batch with failed requests.
Returns:
Optional[BatchRequestContent]: A new batch with failed requests.
"""
# Use IDs to get response status codes, generate new batch with failed requests
batch_with_failed_responses: Optional[BatchRequestContent] = BatchRequestContent()
for batch in self.batches:
for request in batch.requests:
if request.status_code not in [200, 201, 202, 203, 204, 205, 206, 207, 208, 226]:
if batch_with_failed_responses is not None:
batch_with_failed_responses.add_request(request.id, request)
else:
raise ValueError("batch_with_failed_responses is None")
return batch_with_failed_responses

def get_batch_requests_for_execution(self) -> List[BatchRequestContent]:
"""
Gets the batch requests for execution.
Returns:
List[BatchRequestContent]: The batch requests for execution.
"""
# if not self.current_batch.is_finalized:
# self.current_batch.finalize()
# self.batches.append(self.current_batch)
return self.batches

def serialize(self, writer: SerializationWriter) -> None:
"""
Serializes information the current object
Args:
writer: Serialization writer to use to serialize this model
"""
pass
# print(f"serializing {self.batches}")
# writer.write_collection_of_object_values("requests", self.batches)
Loading

0 comments on commit 99e88a0

Please sign in to comment.