From 18023ddb7d4302ae33565cb2b027d32899942d90 Mon Sep 17 00:00:00 2001 From: Ettore Leandro Tognoli Date: Thu, 28 Mar 2024 11:33:19 -0300 Subject: [PATCH] add MediaResolver to WebSession --- ensta/MediaResolver.py | 8 ++++---- ensta/WebSession.py | 11 ++++++++--- test_ensta/test_media_resolver.py | 4 ++-- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/ensta/MediaResolver.py b/ensta/MediaResolver.py index 6eebbd4..4870374 100644 --- a/ensta/MediaResolver.py +++ b/ensta/MediaResolver.py @@ -6,10 +6,10 @@ class MediaResolver: - def resolve_url(self, url: str): - parsed_url = urlparse(url) + def resolve_url(self, url: str) -> Path: + parsed_url = urlparse(str(url)) if not parsed_url.scheme: - return url + return Path(url) filename = Path(parsed_url.path).name _, file = tempfile.mkstemp(filename) response = requests.get(url) @@ -17,4 +17,4 @@ def resolve_url(self, url: str): with open(file, 'wb') as output_stream: for content in response.iter_content(): output_stream.write(content) - return file \ No newline at end of file + return Path(file) \ No newline at end of file diff --git a/ensta/WebSession.py b/ensta/WebSession.py index 355de77..f92b3df 100644 --- a/ensta/WebSession.py +++ b/ensta/WebSession.py @@ -1,4 +1,5 @@ import mimetypes +from ensta.MediaResolver import MediaResolver import tempfile from functools import partial, partialmethod import json @@ -72,12 +73,14 @@ class WebSession: "(KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36" private_user_agent: str = "Instagram 269.0.0.18.75 Android (26/8.0.0; 480dpi; 1080x1920; " \ "OnePlus; 6T Dev; devitron; qcom; en_US; 314665256)" + media_resolver: MediaResolver = None def __init__( self, session_data: str, proxy: dict[str, str] = None, - skip_auth_verification: bool = False + skip_auth_verification: bool = False, + media_resolver: MediaResolver = None, ) -> None: self.session_data = session_data @@ -107,6 +110,8 @@ def __init__( raise SessionError( "SessionID expired. If you used a saved session, delete ensta-session.txt file and try again" ) + + self.media_resolver = media_resolver if media_resolver is not None else MediaResolver() def authenticated(self) -> bool: """ @@ -883,7 +888,7 @@ def _upload_image(self, media: str, upload_id: str | None = None, **kwargs) -> s https://i.instagram.com/rupload_igphoto/ """ rupload_params = dict(kwargs) - media_path: Path = Path(media) + media_path: Path = self.media_resolver.resolve_url(media) mimetype, _ = mimetypes.guess_type(media_path) upload_id = upload_id or time_id() waterfall_id = str(uuid4()) @@ -937,7 +942,7 @@ def _upload_video(self, media: str, upload_id: str | None = None, thumbnail=0, * """ assert thumbnail != None rupload_params = dict(kwargs) - media_path: Path = Path(media) + media_path: Path = self.media_resolver.resolve_url(media) mimetype, _ = mimetypes.guess_type(media_path) video_editor = moviepy.editor.VideoFileClip(media) waterfall_id = str(uuid4()) diff --git a/test_ensta/test_media_resolver.py b/test_ensta/test_media_resolver.py index fe5aa9c..08dbf3d 100644 --- a/test_ensta/test_media_resolver.py +++ b/test_ensta/test_media_resolver.py @@ -58,8 +58,8 @@ def setUp(self) -> None: def test_local(self): local = './test.txt' result = self.media_resolver.resolve_url(local) - self.assertEqual(result, local) - + self.assertEqual(result, Path(local)) + def test_http(self): url = self.get_url('with-content.txt') result = self.media_resolver.resolve_url(url)