Skip to content

Commit

Permalink
feat(downloader): add parallel downloading for M3U8 segments
Browse files Browse the repository at this point in the history
Enhanced Bluesky media handling by implementing parallel downloading of M3U8 video segments. This change improves download speeds by downloading multiple segments simultaneously and merging them into a single file.
  • Loading branch information
HitaloM committed Nov 2, 2024
1 parent cee66f6 commit e3cf6d6
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 54 deletions.
1 change: 1 addition & 0 deletions news/+m3u8.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Enhanced Bluesky media handling with parallel downloading of video content. When downloading videos from Bluesky posts, the system now downloads multiple segments simultaneously, making video retrieval significantly faster and more efficient.
1 change: 1 addition & 0 deletions news/+threadsthumbs.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed the automatic media download for Threads posts containing multiple media items.
132 changes: 78 additions & 54 deletions src/korone/modules/medias/utils/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
# Copyright (c) 2024 Hitalo M. <https://github.com/HitaloM>

import asyncio
import shutil
import tempfile
from contextlib import AsyncExitStack
from io import BytesIO
from pathlib import Path
from urllib.parse import urlparse
from urllib.parse import urljoin, urlparse

import aiofiles
import httpx
Expand Down Expand Up @@ -40,13 +41,10 @@ def is_bsky_thumbnail(media_url: str, mime_type: str) -> bool:
return "bsky.app" in media_url and mime_type == "application/octet-stream"


async def fetch_media_url(media_url: str) -> httpx.Response:
async with httpx.AsyncClient(
headers=GENERIC_HEADER, http2=True, timeout=20, follow_redirects=True, max_redirects=5
) as client:
response = await client.get(media_url)
response.raise_for_status()
return response
async def fetch_media_url(client: httpx.AsyncClient, media_url: str) -> httpx.Response:
response = await client.get(media_url)
response.raise_for_status()
return response


def get_file_extension(mime_type: str) -> str:
Expand All @@ -61,35 +59,33 @@ def convert_to_jpeg(image_data: BytesIO, file_path: Path) -> BytesIO:
return jpeg_buffer


async def download_segment(segment_url: str, stack: AsyncExitStack) -> Path | None:
async def download_segment(
session: httpx.AsyncClient, segment_url: str, temp_dir: Path
) -> Path | None:
try:
response = await fetch_media_url(segment_url)
content = await response.aread()
response = await session.get(segment_url)
response.raise_for_status()
content = response.content

temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".ts") # noqa: SIM115
stack.callback(lambda: Path(temp_file.name).unlink(missing_ok=True))
segment_name = Path(urlparse(segment_url).path).name
segment_path = temp_dir / segment_name

async with aiofiles.open(temp_file.name, mode="wb") as file:
async with aiofiles.open(segment_path, mode="wb") as file:
await file.write(content)

return Path(temp_file.name)
return segment_path
except Exception as error:
await logger.aerror("[Medias/Downloader] Failed to download segment: %s", error)
return None


async def merge_m3u8_segments(segment_files: list[Path], stack: AsyncExitStack) -> BytesIO | None:
async def merge_m3u8_segments(segment_files: list[Path], output_path: Path) -> bool:
try:
list_file = tempfile.NamedTemporaryFile(delete=False, suffix=".txt") # noqa: SIM115
stack.callback(lambda: Path(list_file.name).unlink(missing_ok=True))

async with aiofiles.open(list_file.name, mode="w") as f:
segments_list_path = output_path.parent / "segments.txt"
async with aiofiles.open(segments_list_path, mode="w") as f:
for segment_file in segment_files:
await f.write(f"file '{segment_file.resolve()}'\n")

output_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") # noqa: SIM115
stack.callback(lambda: Path(output_file.name).unlink(missing_ok=True))

ffmpeg_command = [
"ffmpeg",
"-y",
Expand All @@ -98,14 +94,16 @@ async def merge_m3u8_segments(segment_files: list[Path], stack: AsyncExitStack)
"-safe",
"0",
"-i",
list_file.name,
str(segments_list_path),
"-c",
"copy",
output_file.name,
str(output_path),
]

process = await asyncio.create_subprocess_exec(
*ffmpeg_command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
*ffmpeg_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()

Expand All @@ -116,36 +114,52 @@ async def merge_m3u8_segments(segment_files: list[Path], stack: AsyncExitStack)
stdout.decode(),
stderr.decode(),
)
return None
return False

async with aiofiles.open(output_file.name, mode="rb") as f:
merged_content = await f.read()
merged_file = BytesIO(merged_content)
merged_file.name = output_file.name
return merged_file
return True

except Exception as error:
await logger.aerror("[Medias/Downloader] Error merging segments: %s", error)
return None
return False


async def download_m3u8_playlist(media_url: str, content: bytes) -> BytesIO | None:
playlist = m3u8.loads(content.decode("utf-8"))
parsed_url = urlparse(media_url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}{Path(parsed_url.path).parent}"
base_url = media_url.rsplit("/", 1)[0]

async with AsyncExitStack() as stack:
try:
segment_files = await asyncio.gather(*[
download_segment(f"{base_url}/{segment.uri}", stack)
temp_dir = Path(tempfile.mkdtemp())

stack.push_async_callback(
lambda: asyncio.to_thread(shutil.rmtree, temp_dir, ignore_errors=True)
)

async with httpx.AsyncClient(
headers=GENERIC_HEADER, http2=True, timeout=20, follow_redirects=True, max_redirects=5
) as session:
download_tasks = [
download_segment(session, urljoin(base_url + "/", segment.uri), temp_dir)
for segment in playlist.segments
])
valid_segment_files = [file for file in segment_files if file]
return await merge_m3u8_segments(valid_segment_files, stack)
except Exception as error:
await logger.aerror("[Medias/Downloader] Error downloading M3U8 segments: %s", error)
]
segment_files = await asyncio.gather(*download_tasks)
segment_files = [sf for sf in segment_files if sf is not None]

if not segment_files:
await logger.aerror("[Medias/Downloader] No segments were downloaded.")
return None

merged_output_path = temp_dir / "merged_video.mp4"
merge_success = await merge_m3u8_segments(segment_files, merged_output_path)

if not merge_success:
return None

async with aiofiles.open(merged_output_path, mode="rb") as f:
merged_content = await f.read()
merged_file = BytesIO(merged_content)
merged_file.name = "downloaded_media.mp4"
return merged_file


def resize_image(image: Image.Image) -> BytesIO:
width, height = image.size
Expand Down Expand Up @@ -181,9 +195,27 @@ def resize_image(image: Image.Image) -> BytesIO:
return buffer


def process_image(buffer: BytesIO, file_path: Path) -> BytesIO:
image = Image.open(buffer)
buffer.seek(0, 2) # Move to the end of the buffer

if (
buffer.tell() > MAX_FILE_SIZE
or image.width + image.height > MAX_DIMENSION
or image.width / image.height > MAX_RATIO
):
buffer = resize_image(image)
buffer.name = file_path.with_suffix(".jpeg").name

return buffer


async def download_media(media_url: str) -> BytesIO | None:
try:
response = await fetch_media_url(media_url)
async with httpx.AsyncClient(
headers=GENERIC_HEADER, http2=True, timeout=20, follow_redirects=True, max_redirects=5
) as client:
response = await fetch_media_url(client, media_url)
content_type: str = response.headers.get("content-type", "")

if not is_bsky_thumbnail(media_url, content_type) and not is_supported_mime_type(
Expand All @@ -192,7 +224,7 @@ async def download_media(media_url: str) -> BytesIO | None:
await logger.awarning("[Medias/Downloader] MIME type not supported: %s", content_type)
return None

raw_data = await response.aread()
raw_data = response.content
buffer = BytesIO(raw_data)
file_path = Path(urlparse(media_url).path)

Expand All @@ -212,15 +244,7 @@ async def download_media(media_url: str) -> BytesIO | None:
return convert_to_jpeg(buffer, file_path)

if content_type.startswith("image/"):
image = Image.open(buffer)
buffer.seek(0, 2) # Move to the end of the buffer
if (
buffer.tell() > MAX_FILE_SIZE
or image.width + image.height > MAX_DIMENSION
or image.width / image.height > MAX_RATIO
):
buffer = resize_image(image)
buffer.name = file_path.with_suffix(".jpeg").name
return process_image(buffer, file_path)

return buffer

Expand Down

0 comments on commit e3cf6d6

Please sign in to comment.