Skip to content

Commit

Permalink
Merge pull request #83 from ettoreleandrotognoli/dynamic-upload
Browse files Browse the repository at this point in the history
Add support to carousel with video and images
  • Loading branch information
diezo authored Mar 7, 2024
2 parents 5bc4c48 + 769607c commit 9d29395
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 96 deletions.
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ name = "pypi"
[packages]
requests = "*"
moviepy = "*"
pillow = "*"
cryptography = "*"
pyotp = "*"
ntplib = "*"
Expand Down
21 changes: 11 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,27 +142,27 @@ from ensta import Host

host = Host(username, password)

upload = host.get_upload_id("Picture.jpg")
upload = host.upload_image("Picture.jpg")

host.upload_photo(upload, caption="Travelling 🌆")
host.pub_photo(upload, caption="Travelling 🌆")
```

</details>

<details>

<summary>Upload Multiple Photos (Single Post)</summary><br>
<summary>Upload Multiple Medias (Single Post)</summary><br>

```python
from ensta import Host

host = Host(username, password)

upload1 = host.get_upload_id("First.jpg")
upload2 = host.get_upload_id("Second.jpg")
upload3 = host.get_upload_id("Third.jpg")
upload1 = host.upload_image("First.jpg")
upload2 = host.upload_image("Second.jpg")
upload3 = host.upload_video_for_carousel("Video.mp4", thumbnail="Thumbnail.jpg")

host.upload_photos([upload1, upload2, upload3], caption="Travelling 🌆")
host.pub_carousel([upload1, upload2, upload3], caption="Travelling 🌆")
```

</details>
Expand All @@ -176,9 +176,10 @@ from ensta import Host

host = Host(username, password)

host.upload_reel(
video_path="Video.mp4",
thumbnail_path="Thumbnail.jpg",
video_id = host.upload_video_for_reel("Video.mp4", thumbnail="Thumbnail.jpg")

host.pub_reel(
video_id,
caption="Enjoying the winter! ⛄"
)
```
Expand Down
177 changes: 91 additions & 86 deletions ensta/SessionHost.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import mimetypes
import tempfile
from functools import partial, partialmethod
import json
import random
import string
Expand All @@ -24,6 +26,7 @@
APIError,
ConversionError
)
from PIL import Image
from ensta.lib.Searcher import create_search_obj, search_comments
from urllib.parse import urlparse, parse_qs
from .Utils import time_id, fb_uploader
Expand All @@ -32,6 +35,27 @@
USERNAME, UID = 0, 1


IMAGE_RUPLOAD_PARAMS = {
"retry_context": "{\"num_step_auto_retry\": 0, \"num_reupload\": 0, \"num_step_manual_retry\": 0}",
"media_type": "1",
"image_compression": json.dumps({"lib_name": "moz", "lib_version": "3.1.m", "quality": 80})
}

REEL_RUPLOAD_PARAMS = {
"retry_context": "{\"num_step_auto_retry\": 0, \"num_reupload\": 0, \"num_step_manual_retry\": 0}",
"is_clips_video": "1",
"media_type": "2",
}

CAROUSEL_VIDEO_RUPLOAD_PARAMS = {
"retry_context": "{\"num_step_auto_retry\": 0, \"num_reupload\": 0, \"num_step_manual_retry\": 0}",
"is_unified_video": "0",
"is_clips_video": "0",
"is_sidecar": "1",
"media_type": "2",
}


class SessionHost:

session_data: str
Expand Down Expand Up @@ -854,33 +878,24 @@ def change_display_name(self, display_name: str) -> bool:
except JSONDecodeError:
raise NetworkError("HTTP Response is not a valid JSON.")

def get_upload_id(self, media_path: str, arg_upload_id: str | None = None) -> str:
def _upload_image(self, media: str, upload_id: str | None = None, **kwargs) -> str:
"""
Uploads the given images to Instagram's server and returns its unique ID which you can later use to configure single or multiple posts.
:param media_path: Path to the images file (only jpg & jpeg)
:param arg_upload_id: Custom upload_id (for advanced users)
:return: Upload ID of uploaded file
https://i.instagram.com/rupload_igphoto/
"""

media_path: Path = Path(media_path)

rupload_params = dict(kwargs)
media_path: Path = Path(media)
mimetype, _ = mimetypes.guess_type(media_path)

upload_id = arg_upload_id if arg_upload_id is not None else time_id()
upload_id = upload_id or time_id()
waterfall_id = str(uuid4())
upload_name = fb_uploader(upload_id)

rupload_params = {
"retry_context": "{\"num_step_auto_retry\": 0, \"num_reupload\": 0, \"num_step_manual_retry\": 0}",
"media_type": "1",
"xsharing_user_ids": "[]",
rupload_params.update(**{
"upload_id": upload_id,
"image_compression": json.dumps({"lib_name": "moz", "lib_version": "3.1.m", "quality": 80})
}
"xsharing_user_ids": json.dumps([self.user_id]),
})

with open(media_path, "rb") as file:
photo_data = file.read()
photo_length = str(len(photo_data))
image_data = file.read()
image_length = str(len(image_data))

request_headers = {
"accept-encoding": "gzip",
Expand All @@ -889,77 +904,70 @@ def get_upload_id(self, media_path: str, arg_upload_id: str | None = None) -> st
"x-entity-type": mimetype,
"offset": "0",
"x-entity-name": upload_name,
"x-entity-length": photo_length,
"x-entity-length": image_length,
"content-type": mimetype,
"content-length": photo_length
"content-length": image_length
}

http_response = self.request_session.post(
f"https://i.instagram.com/rupload_igphoto/{upload_name}",
data=photo_data,
data=image_data,
headers=request_headers
)

try:
response_json: dict = http_response.json()

if response_json.get("status", "") != "ok": raise NetworkError("Response json key 'status' not ok.")
if response_json.get("upload_id", "") == "": raise NetworkError(
"Key 'upload_id' in response json doesn't exist or is invalid."
)
if response_json.get("status", "") != "ok":
raise NetworkError("Response json key 'status' not ok.")
if response_json.get("upload_id", "") == "":
raise NetworkError(
"Key 'upload_id' in response json doesn't exist or is invalid."
)

return str(response_json.get("upload_id"))

except JSONDecodeError:
raise NetworkError("Response not a valid json.")

def __upload_video(self, path: str, arg_upload_id: str | None = None) -> tuple[bool, any, any, any]:
video_editor = moviepy.editor.VideoFileClip(path)

path: Path = Path(path)
def _upload_video(self, media: str, upload_id: str | None = None, thumbnail=0, **kwargs) -> str:
"""
https://i.instagram.com/rupload_igvideo/
video requires and image as thumbnail
"""
assert thumbnail != None
rupload_params = dict(kwargs)
media_path: Path = Path(media)
mimetype, _ = mimetypes.guess_type(media_path)
video_editor = moviepy.editor.VideoFileClip(media)
waterfall_id = str(uuid4())

upload_id = arg_upload_id if arg_upload_id is not None else time_id()
upload_id = upload_id or time_id()
upload_name = fb_uploader(upload_id)

rupload_params = {
"is_clips_video": "1",
"retry_context": "{\"num_step_auto_retry\": 0, \"num_reupload\": 0, \"num_step_manual_retry\": 0}",
"media_type": "2",
rupload_params.update(**{
"xsharing_user_ids": json.dumps([self.user_id]),
"upload_id": upload_id,
"upload_media_duration_ms": str(int(video_editor.duration * 1000)),
"upload_media_width": str(video_editor.size[0]),
"upload_media_height": str(video_editor.size[1])
}
"upload_media_height": str(video_editor.size[1]),
})

request_headers__get = {
"accept-encoding": "gzip",
"x-instagram-rupload-params": json.dumps(rupload_params),
"x_fb_video_waterfall_id": waterfall_id,
"x-entity-type": "video/mp4"
"x-entity-type": mimetype,
}

http_response__get = self.request_session.get(
f"https://i.instagram.com/rupload_igvideo/{upload_name}",
headers=request_headers__get
)

if http_response__get.status_code != 200: raise NetworkError(
"Video Upload 'GET' Request failed. Status code not 200."
)

# POST Request

with open(path, "rb") as file:
with open(media_path, "rb") as file:
video_data = file.read()
video_length = str(len(video_data))

request_headers = {
"offset": "0",
"x-entity-name": upload_name,
"x-entity-length": video_length,
"content-type": "application/octet-stream",
"content-type": mimetype,
"content-length": video_length,
**request_headers__get
}
Expand All @@ -969,19 +977,36 @@ def __upload_video(self, path: str, arg_upload_id: str | None = None) -> tuple[b
data=video_data,
headers=request_headers
)
response_json: dict = http_response.json()
if response_json.get("status", "") != "ok":
raise Exception()

if isinstance(thumbnail, (int, float)):
thumbnail_frame = video_editor.get_frame(thumbnail)
with tempfile.NamedTemporaryFile(suffix='.jpg') as thumbnail_file:
Image.fromarray(thumbnail_frame).save(thumbnail_file.name)
return self.upload_image(media=thumbnail_file.name, upload_id=upload_id)
return self.upload_image(media=thumbnail, upload_id=upload_id)

def upload_media(self, media: str, upload_id: str | None = None, **kwargs) -> str:

def unknown_type(*_, **__):
raise Exception(f'Unknown type {media}')

mimetype, _ = mimetypes.guess_type(media)
type, _ = mimetype.split('/')
methods = {
'image': self._upload_image,
'video': self._upload_video,
}
method = methods.get(type, unknown_type)
return method(media, upload_id, **kwargs)

try:
response_json: dict = http_response.json()

return response_json.get("status", "") == "ok",\
video_editor.duration,\
video_editor.size[0],\
video_editor.size[1]
upload_video_for_carousel = partialmethod(_upload_video, **CAROUSEL_VIDEO_RUPLOAD_PARAMS)
upload_video_for_reel = partialmethod(_upload_video, **REEL_RUPLOAD_PARAMS)
upload_image = partialmethod(_upload_image, **IMAGE_RUPLOAD_PARAMS)

except JSONDecodeError:
raise NetworkError("Response not a valid json.")

def upload_photo(
def pub_photo(
self,
upload_id: str,
caption: str = "",
Expand Down Expand Up @@ -1057,7 +1082,7 @@ def upload_photo(
except JSONDecodeError:
raise NetworkError("Response not a valid json.")

def upload_photos(
def pub_carousel(
self,
upload_ids: list[str],
caption: str = "",
Expand Down Expand Up @@ -1125,10 +1150,9 @@ def upload_photos(
except JSONDecodeError:
raise NetworkError("Response not a valid json.")

def upload_reel(
def pub_reel(
self,
video_path: str,
thumbnail_path: str,
upload_id: str,
caption: str = "",
alt_text: str = "",
archive_only: bool = False,
Expand All @@ -1148,25 +1172,6 @@ def upload_reel(
:param video_subtitles_enabled: Boolean (Should subtitles be enabled on this reel)
:return: ReelUpload
"""

upload_id = time_id()

video_success, video_duration, video_width, video_height = self.__upload_video(video_path, upload_id)

if not video_success: raise NetworkError(
"Error while uploading video to Instagram. Please try with a "
"different video, and make sure it's an MP4 video."
)

if not self.get_upload_id(
media_path=thumbnail_path,
arg_upload_id=upload_id
):
raise NetworkError(
"Error while uploading thumbnail to Instagram. Please make "
"sure your image is JPG or try with a different one."
)

request_headers: dict = {
"accept": "*/*",
"accept-language": "en-US,en;q=0.9",
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
install_requires=[
"requests",
"moviepy",
"pillow",
"cryptography",
"pyotp",
"ntplib",
Expand Down

0 comments on commit 9d29395

Please sign in to comment.