diff --git a/news/+m3u8.feature.rst b/news/+m3u8.feature.rst new file mode 100644 index 000000000..3181a6f3e --- /dev/null +++ b/news/+m3u8.feature.rst @@ -0,0 +1 @@ +Enhanced Bluesky media handling with parallel downloading of video content. When downloading videos from Bluesky posts, the system now downloads multiple segments simultaneously, making video retrieval significantly faster and more efficient. diff --git a/news/+threadsthumbs.bugfix.rst b/news/+threadsthumbs.bugfix.rst new file mode 100644 index 000000000..9724a212d --- /dev/null +++ b/news/+threadsthumbs.bugfix.rst @@ -0,0 +1 @@ +Fixed the automatic media download for Threads posts containing multiple media items. diff --git a/src/korone/modules/medias/utils/downloader.py b/src/korone/modules/medias/utils/downloader.py index 9579c5332..34422eccb 100644 --- a/src/korone/modules/medias/utils/downloader.py +++ b/src/korone/modules/medias/utils/downloader.py @@ -2,11 +2,12 @@ # Copyright (c) 2024 Hitalo M. import asyncio +import shutil import tempfile from contextlib import AsyncExitStack from io import BytesIO from pathlib import Path -from urllib.parse import urlparse +from urllib.parse import urljoin, urlparse import aiofiles import httpx @@ -40,13 +41,10 @@ def is_bsky_thumbnail(media_url: str, mime_type: str) -> bool: return "bsky.app" in media_url and mime_type == "application/octet-stream" -async def fetch_media_url(media_url: str) -> httpx.Response: - async with httpx.AsyncClient( - headers=GENERIC_HEADER, http2=True, timeout=20, follow_redirects=True, max_redirects=5 - ) as client: - response = await client.get(media_url) - response.raise_for_status() - return response +async def fetch_media_url(client: httpx.AsyncClient, media_url: str) -> httpx.Response: + response = await client.get(media_url) + response.raise_for_status() + return response def get_file_extension(mime_type: str) -> str: @@ -61,35 +59,33 @@ def convert_to_jpeg(image_data: BytesIO, file_path: Path) -> BytesIO: return jpeg_buffer -async def download_segment(segment_url: str, stack: AsyncExitStack) -> Path | None: +async def download_segment( + session: httpx.AsyncClient, segment_url: str, temp_dir: Path +) -> Path | None: try: - response = await fetch_media_url(segment_url) - content = await response.aread() + response = await session.get(segment_url) + response.raise_for_status() + content = response.content - temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".ts") # noqa: SIM115 - stack.callback(lambda: Path(temp_file.name).unlink(missing_ok=True)) + segment_name = Path(urlparse(segment_url).path).name + segment_path = temp_dir / segment_name - async with aiofiles.open(temp_file.name, mode="wb") as file: + async with aiofiles.open(segment_path, mode="wb") as file: await file.write(content) - return Path(temp_file.name) + return segment_path except Exception as error: await logger.aerror("[Medias/Downloader] Failed to download segment: %s", error) return None -async def merge_m3u8_segments(segment_files: list[Path], stack: AsyncExitStack) -> BytesIO | None: +async def merge_m3u8_segments(segment_files: list[Path], output_path: Path) -> bool: try: - list_file = tempfile.NamedTemporaryFile(delete=False, suffix=".txt") # noqa: SIM115 - stack.callback(lambda: Path(list_file.name).unlink(missing_ok=True)) - - async with aiofiles.open(list_file.name, mode="w") as f: + segments_list_path = output_path.parent / "segments.txt" + async with aiofiles.open(segments_list_path, mode="w") as f: for segment_file in segment_files: await f.write(f"file '{segment_file.resolve()}'\n") - output_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") # noqa: SIM115 - stack.callback(lambda: Path(output_file.name).unlink(missing_ok=True)) - ffmpeg_command = [ "ffmpeg", "-y", @@ -98,14 +94,16 @@ async def merge_m3u8_segments(segment_files: list[Path], stack: AsyncExitStack) "-safe", "0", "-i", - list_file.name, + str(segments_list_path), "-c", "copy", - output_file.name, + str(output_path), ] process = await asyncio.create_subprocess_exec( - *ffmpeg_command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + *ffmpeg_command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() @@ -116,36 +114,52 @@ async def merge_m3u8_segments(segment_files: list[Path], stack: AsyncExitStack) stdout.decode(), stderr.decode(), ) - return None + return False - async with aiofiles.open(output_file.name, mode="rb") as f: - merged_content = await f.read() - merged_file = BytesIO(merged_content) - merged_file.name = output_file.name - return merged_file + return True except Exception as error: await logger.aerror("[Medias/Downloader] Error merging segments: %s", error) - return None + return False async def download_m3u8_playlist(media_url: str, content: bytes) -> BytesIO | None: playlist = m3u8.loads(content.decode("utf-8")) - parsed_url = urlparse(media_url) - base_url = f"{parsed_url.scheme}://{parsed_url.netloc}{Path(parsed_url.path).parent}" + base_url = media_url.rsplit("/", 1)[0] async with AsyncExitStack() as stack: - try: - segment_files = await asyncio.gather(*[ - download_segment(f"{base_url}/{segment.uri}", stack) + temp_dir = Path(tempfile.mkdtemp()) + + stack.push_async_callback( + lambda: asyncio.to_thread(shutil.rmtree, temp_dir, ignore_errors=True) + ) + + async with httpx.AsyncClient( + headers=GENERIC_HEADER, http2=True, timeout=20, follow_redirects=True, max_redirects=5 + ) as session: + download_tasks = [ + download_segment(session, urljoin(base_url + "/", segment.uri), temp_dir) for segment in playlist.segments - ]) - valid_segment_files = [file for file in segment_files if file] - return await merge_m3u8_segments(valid_segment_files, stack) - except Exception as error: - await logger.aerror("[Medias/Downloader] Error downloading M3U8 segments: %s", error) + ] + segment_files = await asyncio.gather(*download_tasks) + segment_files = [sf for sf in segment_files if sf is not None] + + if not segment_files: + await logger.aerror("[Medias/Downloader] No segments were downloaded.") return None + merged_output_path = temp_dir / "merged_video.mp4" + merge_success = await merge_m3u8_segments(segment_files, merged_output_path) + + if not merge_success: + return None + + async with aiofiles.open(merged_output_path, mode="rb") as f: + merged_content = await f.read() + merged_file = BytesIO(merged_content) + merged_file.name = "downloaded_media.mp4" + return merged_file + def resize_image(image: Image.Image) -> BytesIO: width, height = image.size @@ -181,9 +195,27 @@ def resize_image(image: Image.Image) -> BytesIO: return buffer +def process_image(buffer: BytesIO, file_path: Path) -> BytesIO: + image = Image.open(buffer) + buffer.seek(0, 2) # Move to the end of the buffer + + if ( + buffer.tell() > MAX_FILE_SIZE + or image.width + image.height > MAX_DIMENSION + or image.width / image.height > MAX_RATIO + ): + buffer = resize_image(image) + buffer.name = file_path.with_suffix(".jpeg").name + + return buffer + + async def download_media(media_url: str) -> BytesIO | None: try: - response = await fetch_media_url(media_url) + async with httpx.AsyncClient( + headers=GENERIC_HEADER, http2=True, timeout=20, follow_redirects=True, max_redirects=5 + ) as client: + response = await fetch_media_url(client, media_url) content_type: str = response.headers.get("content-type", "") if not is_bsky_thumbnail(media_url, content_type) and not is_supported_mime_type( @@ -192,7 +224,7 @@ async def download_media(media_url: str) -> BytesIO | None: await logger.awarning("[Medias/Downloader] MIME type not supported: %s", content_type) return None - raw_data = await response.aread() + raw_data = response.content buffer = BytesIO(raw_data) file_path = Path(urlparse(media_url).path) @@ -212,15 +244,7 @@ async def download_media(media_url: str) -> BytesIO | None: return convert_to_jpeg(buffer, file_path) if content_type.startswith("image/"): - image = Image.open(buffer) - buffer.seek(0, 2) # Move to the end of the buffer - if ( - buffer.tell() > MAX_FILE_SIZE - or image.width + image.height > MAX_DIMENSION - or image.width / image.height > MAX_RATIO - ): - buffer = resize_image(image) - buffer.name = file_path.with_suffix(".jpeg").name + return process_image(buffer, file_path) return buffer