diff --git a/.gitignore b/.gitignore index f12df3a5..369d6251 100644 --- a/.gitignore +++ b/.gitignore @@ -133,3 +133,4 @@ dmypy.json .idea/ app*.py +app* diff --git a/CHANGELOG.md b/CHANGELOG.md index 45626e22..4b8aef12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.0.1] - 2024-04-22 + +### Added + +### Changed + -Enabled Large File Upload and Page iterator support + + ## [1.0.0] - 2023-10-31 ### Added diff --git a/src/msgraph_core/_constants.py b/src/msgraph_core/_constants.py index dcbdcc6e..8dc32c1c 100644 --- a/src/msgraph_core/_constants.py +++ b/src/msgraph_core/_constants.py @@ -8,5 +8,5 @@ """ DEFAULT_REQUEST_TIMEOUT = 100 DEFAULT_CONNECTION_TIMEOUT = 30 -SDK_VERSION = '1.0.0' +SDK_VERSION = '1.0.1' MS_DEFAULT_SCOPE = 'https://graph.microsoft.com/.default' diff --git a/src/msgraph_core/models/__init__.py b/src/msgraph_core/models/__init__.py index 3bb44a19..dae79993 100644 --- a/src/msgraph_core/models/__init__.py +++ b/src/msgraph_core/models/__init__.py @@ -1,3 +1,5 @@ from .page_result import PageResult +from .large_file_upload_session import LargeFileUploadSession +from .upload_result import UploadResult, UploadSessionDataHolder -__all__ = ['PageResult'] +__all__ = ['PageResult', 'LargeFileUploadSession', 'UploadResult', 'UploadSessionDataHolder'] diff --git a/src/msgraph_core/models/large_file_upload_session.py b/src/msgraph_core/models/large_file_upload_session.py new file mode 100644 index 00000000..4d5ee4f1 --- /dev/null +++ b/src/msgraph_core/models/large_file_upload_session.py @@ -0,0 +1,66 @@ +from __future__ import annotations +from typing import Any, Callable, Dict, List, Optional, TYPE_CHECKING, Union +import datetime +from dataclasses import dataclass, field + +from kiota_abstractions.serialization import ( + AdditionalDataHolder, Parsable, ParseNode, SerializationWriter +) + + +@dataclass +class LargeFileUploadSession(AdditionalDataHolder, Parsable): + + additional_data: Dict[str, Any] = field(default_factory=dict) + expiration_date_time: Optional[datetime.datetime] = None + next_expected_ranges: Optional[List[str]] = None + is_cancelled: Optional[bool] = False + odata_type: Optional[str] = None + # The URL endpoint that accepts PUT requests for byte ranges of the file. + upload_url: Optional[str] = None + + @staticmethod + def create_from_discriminator_value( + parse_node: Optional[ParseNode] = None + ) -> LargeFileUploadSession: + """ + Creates a new instance of the appropriate class based + on discriminator value param parse_node: The parse node + to use to read the discriminator value and create the object + Returns: UploadSession + """ + if not parse_node: + raise TypeError("parse_node cannot be null.") + return LargeFileUploadSession() + + def get_field_deserializers(self, ) -> Dict[str, Callable[[ParseNode], None]]: + """ + The deserialization information for the current model + Returns: Dict[str, Callable[[ParseNode], None]] + """ + fields: Dict[str, Callable[[Any], None]] = { + "expirationDateTime": + lambda n: setattr(self, 'expiration_date_time', n.get_datetime_value()), + "nextExpectedRanges": + lambda n: + setattr(self, 'next_expected_ranges', n.get_collection_of_primitive_values(str)), + "@odata.type": + lambda n: setattr(self, 'odata_type', n.get_str_value()), + "uploadUrl": + lambda n: setattr(self, 'upload_url', n.get_str_value()), + } + return fields + + def serialize(self, writer: SerializationWriter) -> None: + """ + Serializes information the current object + param writer: Serialization writer to use to serialize this model + Returns: None + """ + if not writer: + raise TypeError("writer cannot be null.") + writer.write_datetime_value("expirationDateTime", self.expiration_date_time) + writer.write_collection_of_primitive_values("nextExpectedRanges", self.next_expected_ranges) + writer.write_str_value("@odata.type", self.odata_type) + writer.write_str_value("uploadUrl", self.upload_url) + writer.write_additional_data_value(self.additional_data) diff --git a/src/msgraph_core/models/page_result.py b/src/msgraph_core/models/page_result.py index 62e73979..7effd54e 100644 --- a/src/msgraph_core/models/page_result.py +++ b/src/msgraph_core/models/page_result.py @@ -10,14 +10,13 @@ PageResult: Represents a page of items in a paged response. """ from __future__ import annotations -from typing import List, Optional, Dict, Callable +from typing import List, Optional, Dict, Callable, TypeVar from dataclasses import dataclass from kiota_abstractions.serialization.parsable import Parsable from kiota_abstractions.serialization.serialization_writer \ import SerializationWriter from kiota_abstractions.serialization.parse_node import ParseNode -from typing import TypeVar, List, Optional T = TypeVar('T') diff --git a/src/msgraph_core/models/upload_result.py b/src/msgraph_core/models/upload_result.py new file mode 100644 index 00000000..a8edf258 --- /dev/null +++ b/src/msgraph_core/models/upload_result.py @@ -0,0 +1,62 @@ +from typing import Any, Callable, Dict, List, Optional, TypeVar + +from dataclasses import dataclass +from datetime import datetime + +from kiota_abstractions.serialization import ( + AdditionalDataHolder, Parsable, ParseNode, SerializationWriter +) + +T = TypeVar('T') + + +@dataclass +class UploadSessionDataHolder(AdditionalDataHolder, Parsable): + expiration_date_time: Optional[datetime] = None + next_expected_ranges: Optional[List[str]] = None + upload_url: Optional[str] = None + odata_type: Optional[str] = None + + def get_field_deserializers(self, ) -> Dict[str, Callable[[ParseNode], None]]: + """ + The deserialization information for the current model + Returns: Dict[str, Callable[[ParseNode], None]] + """ + fields: Dict[str, Callable[[Any], None]] = { + "expirationDateTime": + lambda n: setattr(self, 'expiration_date_time', n.get_datetime_value()), + "nextExpectedRanges": + lambda n: + setattr(self, 'next_expected_ranges', n.get_collection_of_primitive_values(str)), + "@odata.type": + lambda n: setattr(self, 'odata_type', n.get_str_value()), + "uploadUrl": + lambda n: setattr(self, 'upload_url', n.get_str_value()), + } + return fields + + def serialize(self, writer: SerializationWriter) -> None: + """ + Serializes information the current object + param writer: Serialization writer to use to serialize this model + Returns: None + """ + if not writer: + raise TypeError("writer cannot be null.") + writer.write_datetime_value("expirationDateTime", self.expiration_date_time) + writer.write_collection_of_primitive_values("nextExpectedRanges", self.next_expected_ranges) + writer.write_str_value("@odata.type", self.odata_type) + writer.write_str_value("uploadUrl", self.upload_url) + writer.write_additional_data_value(self.additional_data) + + +class UploadResult: + + def __init__(self): + self.upload_session: Optional[UploadSessionDataHolder] = None + self.item_response: Optional[T] = None + self.location: Optional[str] = None + + @property + def upload_succeeded(self) -> bool: + return self.item_response is not None or self.location is not None diff --git a/src/msgraph_core/tasks/__init__.py b/src/msgraph_core/tasks/__init__.py index 266898cb..388e9cc4 100644 --- a/src/msgraph_core/tasks/__init__.py +++ b/src/msgraph_core/tasks/__init__.py @@ -1 +1,4 @@ from .page_iterator import PageIterator +from .large_file_upload import LargeFileUploadTask + +__all__ = ['PageIterator', 'LargeFileUploadTask'] diff --git a/src/msgraph_core/tasks/large_file_upload.py b/src/msgraph_core/tasks/large_file_upload.py new file mode 100644 index 00000000..a848caee --- /dev/null +++ b/src/msgraph_core/tasks/large_file_upload.py @@ -0,0 +1,305 @@ +import os +from typing import Callable, Optional, List, Tuple, Any, Dict +from io import BytesIO +from asyncio import Future +from datetime import datetime, timedelta, timezone +import logging + +from kiota_abstractions.serialization.parsable import Parsable +from kiota_abstractions.method import Method +from kiota_abstractions.headers_collection import HeadersCollection +from kiota_abstractions.request_information import RequestInformation +from kiota_abstractions.serialization.additional_data_holder import AdditionalDataHolder +from kiota_abstractions.serialization.parsable_factory import ParsableFactory + +from kiota_abstractions.request_adapter import RequestAdapter + +from msgraph_core.models import LargeFileUploadSession, UploadResult, UploadSessionDataHolder # check imports + + +class LargeFileUploadTask: + + def __init__( + self, + upload_session: Parsable, + request_adapter: RequestAdapter, + stream: BytesIO, + parsable_factory: Optional[ParsableFactory] = None, + max_chunk_size: int = 5 * 1024 * 1024 + ): + self._upload_session = upload_session + self._request_adapter = request_adapter + self.stream = stream + try: + self.file_size = stream.getbuffer().nbytes + except AttributeError: + self.file_size = os.stat(stream.name).st_size + self.max_chunk_size = max_chunk_size + self.factory = parsable_factory + cleaned_value = self.check_value_exists( + upload_session, 'get_next_expected_range', ['next_expected_range', 'NextExpectedRange'] + ) + self.next_range = cleaned_value[0] + self._chunks = int((self.file_size / max_chunk_size) + 0.5) + self.on_chunk_upload_complete: Optional[Callable[[List[int]], None]] = None + + @property + def upload_session(self): + return self._upload_session + + @upload_session.setter + def upload_session(self, value): + self._upload_session = value + + @property + def request_adapter(self): + return self._request_adapter + + @property + def chunks(self): + return self._chunks + + @chunks.setter + def chunks(self, value): + self._chunks = value + + def upload_session_expired(self, upload_session: Optional[Parsable] = None) -> bool: + now = datetime.now(timezone.utc) + upload_session = upload_session or self.upload_session + if not hasattr(upload_session, "expiration_date_time"): + raise ValueError("Upload session does not have an expiration date time") + expiry = upload_session.expiration_date_time + if expiry is None: + raise ValueError("Expiry is None") + if isinstance(expiry, str): + then = datetime.strptime(expiry, "%Y-%m-%dT%H:%M:%S") + elif isinstance(expiry, datetime): + then = expiry + else: + raise ValueError("Expiry is not a string or datetime") + interval = now - then + if not isinstance(interval, timedelta): + raise ValueError("Interval is not a timedelta") + if interval.total_seconds() >= 0: + return True + return False + + async def upload(self, after_chunk_upload: Optional[Callable] = None): + # Rewind at this point to take care of failures. + self.stream.seek(0) + if self.upload_session_expired(self.upload_session): + raise RuntimeError('The upload session is expired.') + + self.on_chunk_upload_complete = after_chunk_upload or self.on_chunk_upload_complete + session = await self.next_chunk( + self.stream, 0, max(0, min(self.max_chunk_size - 1, self.file_size - 1)) + ) + process_next = session + # determine the range to be uploaded + # even when resuming existing upload sessions. + range_parts = self.next_range[0].split("-") if self.next_range else ['0', '0'] + end = min(int(range_parts[0]) + self.max_chunk_size - 1, self.file_size) + uploaded_range = [range_parts[0], end] + response = None + + while self.chunks >= 0: + session = process_next + print(f"Chunks for upload : {self.chunks}") + if self.chunks == 0: + # last chunk + print(f"Last chunk: {self.chunks} upload stated") + response = await self.last_chunk(self.stream) + print("Last chunk response: received") + + try: + lfu_session: LargeFileUploadSession = session # type: ignore + if lfu_session is None: + continue + next_range = lfu_session.next_expected_ranges + old_url = self.get_validated_upload_url(self.upload_session) + lfu_session.upload_url = old_url + if self.on_chunk_upload_complete is not None: + self.on_chunk_upload_complete(uploaded_range) + if not next_range: + continue + range_parts = str(next_range[0]).split("-") + end = min(int(range_parts[0]) + self.max_chunk_size, self.file_size) + uploaded_range = [range_parts[0], end] + self.next_range = next_range[0] + "-" + process_next = await self.next_chunk(self.stream) + + except Exception as error: + logging.error("Error uploading chunk %s", error) + finally: + self.chunks -= 1 + upload_result = UploadResult() + upload_result.item_response = response + upload_result.location = self.upload_session.upload_url + return upload_result + + @property + def next_range(self): + return self._next_range + + @next_range.setter + def next_range(self, value: Optional[str]) -> None: + self._next_range = value + + async def next_chunk(self, file: BytesIO, range_start: int = 0, range_end: int = 0) -> Future: + upload_url = self.get_validated_upload_url(self.upload_session) + if not upload_url: + raise ValueError('The upload session URL must not be empty.') + info = RequestInformation() + info.url = upload_url + info.http_method = Method.PUT + if not self.next_range: + self.next_range = f'{range_start}-{range_end}' + range_parts = self.next_range.split('-') if self.next_range else ['-'] + start = int(range_parts[0]) + end = int(range_parts[1]) if len(range_parts) > 1 else 0 + if start == 0 and end == 0: + chunk_data = file.read(self.max_chunk_size) + end = min(self.max_chunk_size - 1, self.file_size - 1) + elif start == 0: + chunk_data = file.read(end + 1) + elif end == 0: + file.seek(start) + chunk_data = file.read(self.max_chunk_size) + end = start + len(chunk_data) - 1 + else: + file.seek(start) + end = min(end, self.max_chunk_size + start) + chunk_data = file.read(end - start + 1) + info.headers = HeadersCollection() + + info.headers.try_add('Content-Range', f'bytes {start}-{end}/{self.file_size}') + info.headers.try_add('Content-Length', str(len(chunk_data))) + info.headers.try_add("Content-Type", "application/octet-stream") + info.set_stream_content(bytes(chunk_data)) + error_map: Dict[str, int] = {} + parsable_factory = LargeFileUploadSession + return await self.request_adapter.send_async(info, parsable_factory, error_map) + + async def last_chunk( + self, + file: BytesIO, + range_start: int = 0, + range_end: int = 0, + parsable_factory: Optional[ParsableFactory] = None + ) -> Future: + upload_url = self.get_validated_upload_url(self.upload_session) + if not upload_url: + raise ValueError('The upload session URL must not be empty.') + info = RequestInformation() + info.url = upload_url + info.http_method = Method.PUT + if not self.next_range: + self.next_range = f'{range_start}-{range_end}' + range_parts = self.next_range.split('-') if self.next_range else ['-'] + start = int(range_parts[0]) + end = int(range_parts[1]) if len(range_parts) > 1 else 0 + if start == 0 and end == 0: + chunk_data = file.read(self.max_chunk_size) + end = min(self.max_chunk_size - 1, self.file_size - 1) + elif start == 0: + chunk_data = file.read(end + 1) + elif end == 0: + file.seek(start) + chunk_data = file.read(self.max_chunk_size) + end = start + len(chunk_data) - 1 + else: + file.seek(start) + end = min(end, self.max_chunk_size + start) + chunk_data = file.read(end - start + 1) + info.headers = HeadersCollection() + + info.headers.try_add('Content-Range', f'bytes {start}-{end}/{self.file_size}') + info.headers.try_add('Content-Length', str(len(chunk_data))) + info.headers.try_add("Content-Type", "application/octet-stream") + info.set_stream_content(bytes(chunk_data)) + error_map: Dict[str, int] = {} + parsable_factory = self.factory or parsable_factory + return await self.request_adapter.send_async(info, parsable_factory, error_map) + + def get_file(self) -> BytesIO: + return self.stream + + async def cancel(self) -> Optional[Future]: + upload_url = self.get_validated_upload_url(self.upload_session) + request_information = RequestInformation(method=Method.DELETE, url_template=upload_url) + + await self.request_adapter.send_no_response_content_async(request_information) + + if hasattr(self.upload_session, 'is_cancelled'): + self.upload_session.is_cancelled = True + elif hasattr(self.upload_session, + 'additional_data') and hasattr(self.upload_session, 'additional_data'): + current = self.upload_session.additional_data + new = {**current, 'is_cancelled': True} + self.upload_session.additional_data = new + + return self.upload_session + + def additional_data_contains(self, parsable: Parsable, + property_candidates: List[str]) -> Tuple[bool, Any]: + if not issubclass(type(parsable), AdditionalDataHolder): + raise ValueError( + f'The object passed does not contain property/properties ' + f'{",".join(property_candidates)} and does not implement ' + f'AdditionalDataHolder' + ) + additional_data = parsable.additional_data + for property_candidate in property_candidates: + if property_candidate in additional_data: + return True, additional_data[property_candidate] + return False, None + + def check_value_exists( + self, parsable: Parsable, attribute_name: str, property_names_in_additional_data: List[str] + ) -> Tuple[bool, Any]: + checked_additional_data = self.additional_data_contains( + parsable, property_names_in_additional_data + ) + if issubclass(type(parsable), AdditionalDataHolder) and checked_additional_data[0]: + return True, checked_additional_data[1] + + if hasattr(parsable, attribute_name): + return True, getattr(parsable, attribute_name) + + return False, None + + async def resume(self) -> Future: + if self.upload_session_expired(self.upload_session): + raise RuntimeError('The upload session is expired.') + + validated_value = self.check_value_exists( + self.upload_session, 'next_expected_ranges', + ['NextExpectedRanges', 'nextExpectedRanges'] + ) + if not validated_value[0]: + raise RuntimeError( + 'The object passed does not contain a valid "nextExpectedRanges" property.' + ) + + next_ranges: List[str] = validated_value[1] + if len(next_ranges) == 0: + raise RuntimeError('No more bytes expected.') + + next_range = next_ranges[0] + self.next_range = next_range + return await self.upload() + + def get_validated_upload_url(self, upload_session: Parsable) -> str: + if not hasattr(upload_session, 'upload_url'): + raise RuntimeError('The upload session does not contain a valid upload url') + result = upload_session.upload_url + + if result is None or result.strip() == '': + raise RuntimeError('The upload URL cannot be empty.') + return result + + def get_next_range(self) -> Optional[str]: + return self.next_range + + def get_file_size(self) -> int: + return self.file_size