From a5d50822223e290fb941cd4accc6b9991db1d191 Mon Sep 17 00:00:00 2001 From: Phil Miesle Date: Tue, 19 Nov 2024 09:40:35 +0000 Subject: [PATCH 01/20] adding ability for APIRequest to retry and save to a file --- .../langflow/components/data/api_request.py | 146 +++++++++++++++++- src/backend/base/pyproject.toml | 1 + 2 files changed, 139 insertions(+), 8 deletions(-) diff --git a/src/backend/base/langflow/components/data/api_request.py b/src/backend/base/langflow/components/data/api_request.py index f5f1aa52138..c8f44b2d052 100644 --- a/src/backend/base/langflow/components/data/api_request.py +++ b/src/backend/base/langflow/components/data/api_request.py @@ -1,14 +1,21 @@ import asyncio import json +import mimetypes +import re +import tempfile +from datetime import datetime +from zoneinfo import ZoneInfo +from pathlib import Path from typing import Any from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse import httpx +import validators from loguru import logger from langflow.base.curl.parse import parse_context from langflow.custom import Component -from langflow.io import DataInput, DropdownInput, IntInput, MessageTextInput, NestedDictInput, Output +from langflow.io import BoolInput, DataInput, DropdownInput, IntInput, MessageTextInput, NestedDictInput, Output from langflow.schema import Data from langflow.schema.dotdict import dotdict @@ -73,6 +80,20 @@ class APIRequestComponent(Component): value=5, info="The timeout to use for the request.", ), + BoolInput( + name="follow_redirects", + display_name="Follow Redirects", + value=True, + info="Whether to follow http redirects.", + advanced=True, + ), + BoolInput( + name="save_to_file", + display_name="Save to File", + value=False, + info="Save the API response to a temporary file", + advanced=True, + ), ] outputs = [ @@ -113,6 +134,9 @@ async def make_request( headers: dict | None = None, body: dict | None = None, timeout: int = 5, + *, + follow_redirects: bool = True, + save_to_file: bool = False, ) -> Data: method = method.upper() if method not in {"GET", "POST", "PATCH", "PUT", "DELETE"}: @@ -129,20 +153,58 @@ async def make_request( raise ValueError(msg) from e data = body or None + redirection_history = [] try: - response = await client.request(method, url, headers=headers, json=data, timeout=timeout) - try: - result = response.json() - except Exception: # noqa: BLE001 - logger.opt(exception=True).debug("Error decoding JSON response") - result = response.text + response = await client.request( + method, + url, + headers=headers, + json=data, + timeout=timeout, + follow_redirects=follow_redirects, + ) + + redirection_history = [ + {"url": str(redirect.url), "status_code": redirect.status_code} for redirect in response.history + ] + + if response.is_redirect: + redirection_history.append({"url": str(response.url), "status_code": response.status_code}) + + is_binary, file_path = self._response_info(response, with_file_path=save_to_file) + + if save_to_file: + mode = "wb" if is_binary else "w" + encoding = response.encoding if mode == "w" else None + with file_path.open(mode, encoding=encoding) as f: + f.write(response.content if is_binary else response.text) + + return Data( + data={ + "source": url, + "headers": headers, + "status_code": response.status_code, + "file_path": str(file_path), + **({"redirection_history": redirection_history} if redirection_history else {}), + }, + ) + # Populate result when not saving to a file + if is_binary: + result = response.content + else: + try: + result = response.json() + except Exception: # noqa: BLE001 + logger.opt(exception=True).debug("Error decoding JSON response") + result = response.text return Data( data={ "source": url, "headers": headers, "status_code": response.status_code, "result": result, + **({"redirection_history": redirection_history} if redirection_history else {}), }, ) except httpx.TimeoutException: @@ -162,6 +224,7 @@ async def make_request( "headers": headers, "status_code": 500, "error": str(exc), + **({"redirection_history": redirection_history} if redirection_history else {}), }, ) @@ -179,6 +242,13 @@ async def make_requests(self) -> list[Data]: headers = self.headers or {} body = self.body or {} timeout = self.timeout + follow_redirects = self.follow_redirects + save_to_file = self.save_to_file + + invalid_urls = [url for url in urls if not validators.url(url)] + if invalid_urls: + msg = f"Invalid URLs provided: {invalid_urls}" + raise ValueError(msg) if isinstance(self.query_params, str): query_params = dict(parse_qsl(self.query_params)) @@ -201,9 +271,69 @@ async def make_requests(self) -> list[Data]: async with httpx.AsyncClient() as client: results = await asyncio.gather( *[ - self.make_request(client, method, u, headers, rec, timeout) + self.make_request(client, method, u, headers, rec, timeout, follow_redirects=follow_redirects, save_to_file=save_to_file) for u, rec in zip(urls, bodies, strict=True) ] ) self.status = results return results + + def _response_info(self, response: httpx.Response, *, with_file_path: bool = False) -> tuple[bool, Path | None]: + """Determine the file path and whether the response content is binary. + + Args: + response (Response): The HTTP response object. + with_file_path (bool): Whether to save the response content to a file. + + Returns: + Tuple[bool, Path | None]: + A tuple containing a boolean indicating if the content is binary and the full file path (if applicable). + """ + # Determine if the content is binary + content_type = response.headers.get("Content-Type", "") + is_binary = "application/octet-stream" in content_type or "application/binary" in content_type + + if not with_file_path: + return is_binary, None + + # Step 1: Set up a subdirectory for the component in the OS temp directory + component_temp_dir = Path(tempfile.gettempdir()) / self.__class__.__name__ + component_temp_dir.mkdir(parents=True, exist_ok=True) + + # Step 2: Extract filename from Content-Disposition + filename = None + if "Content-Disposition" in response.headers: + content_disposition = response.headers["Content-Disposition"] + filename_match = re.search(r'filename="(.+?)"', content_disposition) + if not filename_match: # Try to match RFC 5987 style + filename_match = re.search(r"filename\*=(?:UTF-8'')?(.+)", content_disposition) + if filename_match: + extracted_filename = filename_match.group(1) + # Ensure the filename is unique + if (component_temp_dir / extracted_filename).exists(): + timestamp = datetime.now(ZoneInfo("UTC")).strftime("%Y%m%d%H%M%S%f") + filename = f"{timestamp}-{extracted_filename}" + else: + filename = extracted_filename + + # Step 3: Infer file extension or use part of the request URL if no filename + if not filename: + # Extract the last segment of the URL path + url_path = urlparse(response.request.url).path + base_name = Path(url_path).name # Get the last segment of the path + if not base_name: # If the path ends with a slash or is empty + base_name = "response" + + # Infer file extension + extension = mimetypes.guess_extension(content_type.split(";")[0]) if content_type else None + if not extension: + extension = ".bin" if is_binary else ".txt" # Default extensions + + # Combine the base name with timestamp and extension + timestamp = datetime.now(ZoneInfo("UTC")).strftime("%Y%m%d%H%M%S%f") + filename = f"{timestamp}-{base_name}{extension}" + + # Step 4: Define the full file path + file_path = component_temp_dir / filename + + return is_binary, file_path diff --git a/src/backend/base/pyproject.toml b/src/backend/base/pyproject.toml index 20ff2d88384..cbd1b791158 100644 --- a/src/backend/base/pyproject.toml +++ b/src/backend/base/pyproject.toml @@ -162,6 +162,7 @@ dependencies = [ "fastapi-pagination>=0.12.29", "defusedxml>=0.7.1", "pypdf~=5.1.0", + "validators>=0.34.0", ] [project.urls] From fff6a23759af4f71132c4e05d49c6fb91ff3fe86 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Tue, 19 Nov 2024 09:43:06 +0000 Subject: [PATCH 02/20] [autofix.ci] apply automated fixes --- .../base/langflow/components/data/api_request.py | 15 ++++++++++++--- uv.lock | 2 ++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/backend/base/langflow/components/data/api_request.py b/src/backend/base/langflow/components/data/api_request.py index c8f44b2d052..4fb59059bf2 100644 --- a/src/backend/base/langflow/components/data/api_request.py +++ b/src/backend/base/langflow/components/data/api_request.py @@ -4,10 +4,10 @@ import re import tempfile from datetime import datetime -from zoneinfo import ZoneInfo from pathlib import Path from typing import Any from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse +from zoneinfo import ZoneInfo import httpx import validators @@ -134,7 +134,7 @@ async def make_request( headers: dict | None = None, body: dict | None = None, timeout: int = 5, - *, + *, follow_redirects: bool = True, save_to_file: bool = False, ) -> Data: @@ -271,7 +271,16 @@ async def make_requests(self) -> list[Data]: async with httpx.AsyncClient() as client: results = await asyncio.gather( *[ - self.make_request(client, method, u, headers, rec, timeout, follow_redirects=follow_redirects, save_to_file=save_to_file) + self.make_request( + client, + method, + u, + headers, + rec, + timeout, + follow_redirects=follow_redirects, + save_to_file=save_to_file, + ) for u, rec in zip(urls, bodies, strict=True) ] ) diff --git a/uv.lock b/uv.lock index 4b16d9f7db0..fbbf0e3fa12 100644 --- a/uv.lock +++ b/uv.lock @@ -3842,6 +3842,7 @@ dependencies = [ { name = "typer" }, { name = "uncurl" }, { name = "uvicorn" }, + { name = "validators" }, ] [package.optional-dependencies] @@ -4002,6 +4003,7 @@ requires-dist = [ { name = "types-requests", marker = "extra == 'dev'", specifier = ">=2.32.0" }, { name = "uncurl", specifier = ">=0.0.11" }, { name = "uvicorn", specifier = ">=0.30.0" }, + { name = "validators", specifier = ">=0.34.0" }, { name = "vulture", marker = "extra == 'dev'", specifier = ">=2.11" }, ] From 256936da95882e1c77405529399b43d26d1e46d6 Mon Sep 17 00:00:00 2001 From: Phil Miesle Date: Tue, 19 Nov 2024 09:40:35 +0000 Subject: [PATCH 03/20] adding ability for APIRequest to retry and save to a file --- .../langflow/components/data/api_request.py | 146 +++++++++++++++++- src/backend/base/pyproject.toml | 1 + 2 files changed, 139 insertions(+), 8 deletions(-) diff --git a/src/backend/base/langflow/components/data/api_request.py b/src/backend/base/langflow/components/data/api_request.py index f5f1aa52138..c8f44b2d052 100644 --- a/src/backend/base/langflow/components/data/api_request.py +++ b/src/backend/base/langflow/components/data/api_request.py @@ -1,14 +1,21 @@ import asyncio import json +import mimetypes +import re +import tempfile +from datetime import datetime +from zoneinfo import ZoneInfo +from pathlib import Path from typing import Any from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse import httpx +import validators from loguru import logger from langflow.base.curl.parse import parse_context from langflow.custom import Component -from langflow.io import DataInput, DropdownInput, IntInput, MessageTextInput, NestedDictInput, Output +from langflow.io import BoolInput, DataInput, DropdownInput, IntInput, MessageTextInput, NestedDictInput, Output from langflow.schema import Data from langflow.schema.dotdict import dotdict @@ -73,6 +80,20 @@ class APIRequestComponent(Component): value=5, info="The timeout to use for the request.", ), + BoolInput( + name="follow_redirects", + display_name="Follow Redirects", + value=True, + info="Whether to follow http redirects.", + advanced=True, + ), + BoolInput( + name="save_to_file", + display_name="Save to File", + value=False, + info="Save the API response to a temporary file", + advanced=True, + ), ] outputs = [ @@ -113,6 +134,9 @@ async def make_request( headers: dict | None = None, body: dict | None = None, timeout: int = 5, + *, + follow_redirects: bool = True, + save_to_file: bool = False, ) -> Data: method = method.upper() if method not in {"GET", "POST", "PATCH", "PUT", "DELETE"}: @@ -129,20 +153,58 @@ async def make_request( raise ValueError(msg) from e data = body or None + redirection_history = [] try: - response = await client.request(method, url, headers=headers, json=data, timeout=timeout) - try: - result = response.json() - except Exception: # noqa: BLE001 - logger.opt(exception=True).debug("Error decoding JSON response") - result = response.text + response = await client.request( + method, + url, + headers=headers, + json=data, + timeout=timeout, + follow_redirects=follow_redirects, + ) + + redirection_history = [ + {"url": str(redirect.url), "status_code": redirect.status_code} for redirect in response.history + ] + + if response.is_redirect: + redirection_history.append({"url": str(response.url), "status_code": response.status_code}) + + is_binary, file_path = self._response_info(response, with_file_path=save_to_file) + + if save_to_file: + mode = "wb" if is_binary else "w" + encoding = response.encoding if mode == "w" else None + with file_path.open(mode, encoding=encoding) as f: + f.write(response.content if is_binary else response.text) + + return Data( + data={ + "source": url, + "headers": headers, + "status_code": response.status_code, + "file_path": str(file_path), + **({"redirection_history": redirection_history} if redirection_history else {}), + }, + ) + # Populate result when not saving to a file + if is_binary: + result = response.content + else: + try: + result = response.json() + except Exception: # noqa: BLE001 + logger.opt(exception=True).debug("Error decoding JSON response") + result = response.text return Data( data={ "source": url, "headers": headers, "status_code": response.status_code, "result": result, + **({"redirection_history": redirection_history} if redirection_history else {}), }, ) except httpx.TimeoutException: @@ -162,6 +224,7 @@ async def make_request( "headers": headers, "status_code": 500, "error": str(exc), + **({"redirection_history": redirection_history} if redirection_history else {}), }, ) @@ -179,6 +242,13 @@ async def make_requests(self) -> list[Data]: headers = self.headers or {} body = self.body or {} timeout = self.timeout + follow_redirects = self.follow_redirects + save_to_file = self.save_to_file + + invalid_urls = [url for url in urls if not validators.url(url)] + if invalid_urls: + msg = f"Invalid URLs provided: {invalid_urls}" + raise ValueError(msg) if isinstance(self.query_params, str): query_params = dict(parse_qsl(self.query_params)) @@ -201,9 +271,69 @@ async def make_requests(self) -> list[Data]: async with httpx.AsyncClient() as client: results = await asyncio.gather( *[ - self.make_request(client, method, u, headers, rec, timeout) + self.make_request(client, method, u, headers, rec, timeout, follow_redirects=follow_redirects, save_to_file=save_to_file) for u, rec in zip(urls, bodies, strict=True) ] ) self.status = results return results + + def _response_info(self, response: httpx.Response, *, with_file_path: bool = False) -> tuple[bool, Path | None]: + """Determine the file path and whether the response content is binary. + + Args: + response (Response): The HTTP response object. + with_file_path (bool): Whether to save the response content to a file. + + Returns: + Tuple[bool, Path | None]: + A tuple containing a boolean indicating if the content is binary and the full file path (if applicable). + """ + # Determine if the content is binary + content_type = response.headers.get("Content-Type", "") + is_binary = "application/octet-stream" in content_type or "application/binary" in content_type + + if not with_file_path: + return is_binary, None + + # Step 1: Set up a subdirectory for the component in the OS temp directory + component_temp_dir = Path(tempfile.gettempdir()) / self.__class__.__name__ + component_temp_dir.mkdir(parents=True, exist_ok=True) + + # Step 2: Extract filename from Content-Disposition + filename = None + if "Content-Disposition" in response.headers: + content_disposition = response.headers["Content-Disposition"] + filename_match = re.search(r'filename="(.+?)"', content_disposition) + if not filename_match: # Try to match RFC 5987 style + filename_match = re.search(r"filename\*=(?:UTF-8'')?(.+)", content_disposition) + if filename_match: + extracted_filename = filename_match.group(1) + # Ensure the filename is unique + if (component_temp_dir / extracted_filename).exists(): + timestamp = datetime.now(ZoneInfo("UTC")).strftime("%Y%m%d%H%M%S%f") + filename = f"{timestamp}-{extracted_filename}" + else: + filename = extracted_filename + + # Step 3: Infer file extension or use part of the request URL if no filename + if not filename: + # Extract the last segment of the URL path + url_path = urlparse(response.request.url).path + base_name = Path(url_path).name # Get the last segment of the path + if not base_name: # If the path ends with a slash or is empty + base_name = "response" + + # Infer file extension + extension = mimetypes.guess_extension(content_type.split(";")[0]) if content_type else None + if not extension: + extension = ".bin" if is_binary else ".txt" # Default extensions + + # Combine the base name with timestamp and extension + timestamp = datetime.now(ZoneInfo("UTC")).strftime("%Y%m%d%H%M%S%f") + filename = f"{timestamp}-{base_name}{extension}" + + # Step 4: Define the full file path + file_path = component_temp_dir / filename + + return is_binary, file_path diff --git a/src/backend/base/pyproject.toml b/src/backend/base/pyproject.toml index 20ff2d88384..cbd1b791158 100644 --- a/src/backend/base/pyproject.toml +++ b/src/backend/base/pyproject.toml @@ -162,6 +162,7 @@ dependencies = [ "fastapi-pagination>=0.12.29", "defusedxml>=0.7.1", "pypdf~=5.1.0", + "validators>=0.34.0", ] [project.urls] From 576be9cb4b2492b2a2cb52654dceb7be205c117f Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Tue, 19 Nov 2024 09:43:06 +0000 Subject: [PATCH 04/20] [autofix.ci] apply automated fixes --- .../base/langflow/components/data/api_request.py | 15 ++++++++++++--- uv.lock | 2 ++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/backend/base/langflow/components/data/api_request.py b/src/backend/base/langflow/components/data/api_request.py index c8f44b2d052..4fb59059bf2 100644 --- a/src/backend/base/langflow/components/data/api_request.py +++ b/src/backend/base/langflow/components/data/api_request.py @@ -4,10 +4,10 @@ import re import tempfile from datetime import datetime -from zoneinfo import ZoneInfo from pathlib import Path from typing import Any from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse +from zoneinfo import ZoneInfo import httpx import validators @@ -134,7 +134,7 @@ async def make_request( headers: dict | None = None, body: dict | None = None, timeout: int = 5, - *, + *, follow_redirects: bool = True, save_to_file: bool = False, ) -> Data: @@ -271,7 +271,16 @@ async def make_requests(self) -> list[Data]: async with httpx.AsyncClient() as client: results = await asyncio.gather( *[ - self.make_request(client, method, u, headers, rec, timeout, follow_redirects=follow_redirects, save_to_file=save_to_file) + self.make_request( + client, + method, + u, + headers, + rec, + timeout, + follow_redirects=follow_redirects, + save_to_file=save_to_file, + ) for u, rec in zip(urls, bodies, strict=True) ] ) diff --git a/uv.lock b/uv.lock index 4b16d9f7db0..fbbf0e3fa12 100644 --- a/uv.lock +++ b/uv.lock @@ -3842,6 +3842,7 @@ dependencies = [ { name = "typer" }, { name = "uncurl" }, { name = "uvicorn" }, + { name = "validators" }, ] [package.optional-dependencies] @@ -4002,6 +4003,7 @@ requires-dist = [ { name = "types-requests", marker = "extra == 'dev'", specifier = ">=2.32.0" }, { name = "uncurl", specifier = ">=0.0.11" }, { name = "uvicorn", specifier = ">=0.30.0" }, + { name = "validators", specifier = ">=0.34.0" }, { name = "vulture", marker = "extra == 'dev'", specifier = ">=2.11" }, ] From 983abfd28d6789a779b3225c977745ea6123e675 Mon Sep 17 00:00:00 2001 From: Phil Miesle Date: Tue, 19 Nov 2024 16:23:30 +0000 Subject: [PATCH 05/20] initial refactor of FileComponent to handle Data input --- .../base/langflow/components/data/file.py | 323 +++++++++++------- 1 file changed, 197 insertions(+), 126 deletions(-) diff --git a/src/backend/base/langflow/components/data/file.py b/src/backend/base/langflow/components/data/file.py index 8a5088aaea0..0ca062cb913 100644 --- a/src/backend/base/langflow/components/data/file.py +++ b/src/backend/base/langflow/components/data/file.py @@ -4,35 +4,45 @@ from langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data from langflow.custom import Component -from langflow.io import BoolInput, FileInput, IntInput, Output +from langflow.io import BoolInput, FileInput, IntInput, Output, HandleInput from langflow.schema import Data class FileComponent(Component): - """Handles loading of individual or zipped text files. + """Handles loading and processing of individual or zipped text files. - Processes multiple valid files within a zip archive if provided. + This component supports processing multiple valid files within a zip archive, + resolving paths, validating file types, and optionally using multithreading for processing. Attributes: - display_name: Display name of the component. - description: Brief component description. - icon: Icon to represent the component. - name: Identifier for the component. - inputs: Inputs required by the component. - outputs: Output of the component after processing files. + display_name (str): Display name of the component. + description (str): Brief description of the component. + icon (str): Icon representing the component in the UI. + name (str): Identifier for the component. + inputs (list): Inputs required by the component, including file paths and processing options. + outputs (list): Outputs of the component after processing files, returning parsed data. """ - display_name = "File" description = "Load a file to be used in your project." icon = "file-text" name = "File" + SERVER_FILE_PATH_FIELDNAME = "file_path" + inputs = [ FileInput( name="path", display_name="Path", file_types=[*TEXT_FILE_TYPES, "zip"], info=f"Supported file types: {', '.join([*TEXT_FILE_TYPES, 'zip'])}", + required=False, + ), + HandleInput( + name="file_path", + display_name="Server File Path", + info=f"Data object with a '{SERVER_FILE_PATH_FIELDNAME}' property pointing to server file.", + required=False, + input_types=["Data"], ), BoolInput( name="silent_errors", @@ -53,159 +63,220 @@ class FileComponent(Component): info="The maximum number of workers to use, if concurrency is enabled", value=4, ), + BoolInput( + name="delete_server_file_after_processing", + display_name="Delete Server File After Processing", + advanced=True, + value=True, + info="If true, the Server File Path will be deleted after processing.", + ), ] outputs = [Output(display_name="Data", name="data", method="load_file")] - def load_file(self) -> Data: - """Load and parse file(s) from a zip archive. + def load_file(self) -> list[Data]: + """Loads and parses file(s), including unpacked zip files, with optional parallelism. + + This method processes files by resolving paths, validating file extensions, and optionally using + multithreading. Files within zip archives are unpacked and processed as individual files. Raises: - ValueError: If no file is uploaded or file path is invalid. + ValueError: If no valid file is provided, or if file extensions are unsupported. Returns: - Data: Parsed data from file(s). + list[Data]: A list of parsed data objects from the processed files. """ - # Check if the file path is provided - if not self.path: - self.log("File path is missing.") - msg = "Please upload a file for processing." - - raise ValueError(msg) - - resolved_path = Path(self.resolve_path(self.path)) + resolved_paths = self._resolve_paths() + + def process_file(file_path: Path) -> Data: + try: + self.log(f"Processing file: {file_path.name}.") + return self._process_single_file(file_path) + except FileNotFoundError as e: + msg = f"File not found: {file_path.name}. Error: {e}" + self.log(msg) + if not self.silent_errors: + raise e + return None + except Exception as e: + msg = f"Unexpected error processing {file_path.name}: {e}" + self.log(msg) + if not self.silent_errors: + raise e + return None + + valid_file_paths = [path for path, _ in resolved_paths if path.suffix in [f".{ext}" for ext in TEXT_FILE_TYPES]] + + if not self.use_multithreading: + self.log("Processing files sequentially.") + processed_data = [process_file(path) for path in valid_file_paths if path] + else: + self.log(f"Starting parallel processing with max workers: {self.concurrency_multithreading}.") + processed_data = parallel_load_data( + valid_file_paths, + silent_errors=self.silent_errors, + load_function=process_file, + max_concurrency=self.concurrency_multithreading, + ) + + # Cleanup and filter results try: - # Check if the file is a zip archive - if is_zipfile(resolved_path): - self.log(f"Processing zip file: {resolved_path.name}.") - - return self._process_zip_file( - resolved_path, - silent_errors=self.silent_errors, - parallel=self.use_multithreading, - ) - - self.log(f"Processing single file: {resolved_path.name}.") + return [data for data in processed_data if data] + finally: + for path, delete_after_processing in resolved_paths: + if delete_after_processing and path.exists(): + self.log(f"Deleting file: {path.name}.") + path.unlink() - return self._process_single_file(resolved_path, silent_errors=self.silent_errors) - except FileNotFoundError: - self.log(f"File not found: {resolved_path.name}.") + def _process_single_file(self, file_path: Path) -> Data: + """Processes a single file and returns parsed data. - raise - - def _process_zip_file(self, zip_path: Path, *, silent_errors: bool = False, parallel: bool = False) -> Data: - """Process text files within a zip archive. + This method reads the content of a file, validates its format, and parses it + into a `Data` object. Args: - zip_path: Path to the zip file. - silent_errors: Suppresses errors if True. - parallel: Enables parallel processing if True. + file_path (Path): Path to the file to be processed. Returns: - list[Data]: Combined data from all valid files. + Data: Parsed data from the file. Raises: - ValueError: If no valid files found in the archive. + ValueError: If the file cannot be parsed or is unsupported. """ - data: list[Data] = [] - with ZipFile(zip_path, "r") as zip_file: - # Filter file names based on extensions in TEXT_FILE_TYPES and ignore hidden files - valid_files = [ - name - for name in zip_file.namelist() - if ( - any(name.endswith(ext) for ext in TEXT_FILE_TYPES) - and not name.startswith("__MACOSX") - and not name.startswith(".") - ) - ] + data = parse_text_file_to_data(str(file_path), silent_errors=self.silent_errors) + return data or Data() - # Raise an error if no valid files found - if not valid_files: - self.log("No valid files in the zip archive.") - - # Return empty data if silent_errors is True - if silent_errors: - return data # type: ignore[return-value] + def _resolve_paths(self) -> list[tuple[Path, bool]]: + """Resolves a file path and validates its extension. - # Raise an error if no valid files found - msg = "No valid files in the zip archive." - raise ValueError(msg) + This method checks whether the file extension is supported (matching TEXT_FILE_TYPES or 'zip'). + It resolves the provided path and logs errors for unsupported file types. - # Define a function to process each file - def process_file(file_name, silent_errors=silent_errors): - with NamedTemporaryFile(delete=False) as temp_file: - temp_path = Path(temp_file.name).with_name(file_name) - with zip_file.open(file_name) as file_content: - temp_path.write_bytes(file_content.read()) - try: - return self._process_single_file(temp_path, silent_errors=silent_errors) - finally: - temp_path.unlink() - - # Process files in parallel if specified - if parallel: - self.log( - f"Initializing parallel Thread Pool Executor with max workers: " - f"{self.concurrency_multithreading}." - ) + Args: + path (str): The input file path to be resolved and validated. - # Process files in parallel - initial_data = parallel_load_data( - valid_files, - silent_errors=silent_errors, - load_function=process_file, - max_concurrency=self.concurrency_multithreading, - ) + Returns: + Path | None: The resolved file path if valid, or None if the file type is unsupported and silent_errors is enabled. - # Filter out empty data - data = list(filter(None, initial_data)) + Raises: + ValueError: If the file type is unsupported and silent_errors is disabled. + """ + resolved_paths = [] + + def add_path(path: str, to_remove: bool): + resolved_path = Path(self.resolve_path(path)) + if resolved_path.suffix not in [f".{ext}" for ext in [*TEXT_FILE_TYPES, "zip"]]: + msg = f"Unsupported file type: {resolved_path.suffix}" + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) else: - # Sequential processing - data = [process_file(file_name) for file_name in valid_files] - - self.log(f"Successfully processed zip file: {zip_path.name}.") - - return data # type: ignore[return-value] + resolved_paths.append((resolved_path, to_remove)) + + # Add self.path if provided; we do not delete these to preserve original behavior + if self.path: + add_path(self.path, False) + + # Add paths from file_path if provided + if self.file_path: + if isinstance(self.file_path, Data): + self.file_path = [self.file_path] + + if isinstance(self.file_path, list): + for obj in self.file_path: + if not isinstance(obj, Data): + msg = f"Unexpected type in file_path. Expected Data, got {type(obj)}." + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) + continue + + server_file_path = obj.data.get(self.SERVER_FILE_PATH_FIELDNAME) + if server_file_path: + add_path(server_file_path, self.delete_server_file_after_processing) + else: + msg = f"One of the Data objects is missing the `{self.SERVER_FILE_PATH_FIELDNAME}` property." + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) + else: + msg = f"Unexpected type in file_path. Expected list, got {type(self.file_path)}." + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) + + # Unpack zip files and process valid extensions + final_paths = [] + for path, to_remove in resolved_paths: + final_paths.append((path, to_remove)) + if is_zipfile(path): + self.log(f"Unpacking zip file: {path.name}.") + # always remove zip file contents after processing + final_paths.extend((p, True) for p in self._unpack_zip_file(path)) + + return final_paths + + def _resolve_and_validate_path(self, path: str) -> Path | None: + """ + Resolves a given file path and validates its extension. - def _process_single_file(self, file_path: Path, *, silent_errors: bool = False) -> Data: - """Process a single file. + Checks if the file extension is supported (matches either TEXT_FILE_TYPES or 'zip'). + Logs a message and optionally raises a ValueError if the file type is unsupported. Args: - file_path: Path to the file. - silent_errors: Suppresses errors if True. + path (str): The input file path to resolve and validate. Returns: - Data: Parsed data from the file. + Path | None: The resolved file path if valid, or None if the file type is unsupported and silent_errors is enabled. Raises: - ValueError: For unsupported file formats. + ValueError: If the file type is unsupported and silent_errors is False. """ - # Check if the file type is supported - if not any(file_path.suffix == ext for ext in ["." + f for f in TEXT_FILE_TYPES]): - self.log(f"Unsupported file type: {file_path.suffix}") + resolved_path = Path(self.resolve_path(path)) + if not any(resolved_path.suffix == f".{ext}" for ext in [*TEXT_FILE_TYPES, '.zip']): + msg = f"Unsupported file type: {resolved_path.suffix}" + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) + return None + return resolved_path - # Return empty data if silent_errors is True - if silent_errors: - return Data() + def _unpack_zip_file(self, zip_path: Path) -> list[Path]: + """Unpacks a zip file and returns paths to its extracted files. - msg = f"Unsupported file type: {file_path.suffix}" - raise ValueError(msg) + This method extracts files from a zip archive, validating their extensions and ignoring + hidden or unsupported files. - try: - # Parse the text file as appropriate - data = parse_text_file_to_data(str(file_path), silent_errors=silent_errors) # type: ignore[assignment] - if not data: - data = Data() + Args: + zip_path (Path): The path to the zip file to be unpacked. + + Returns: + list[Path]: A list of paths to valid files extracted from the zip archive. - self.log(f"Successfully processed file: {file_path.name}.") - except Exception as e: - self.log(f"Error processing file {file_path.name}: {e}") + Raises: + ValueError: If the zip file contains no valid files or cannot be read. + """ + unpacked_files = [] + with ZipFile(zip_path, "r") as zip_file: + valid_files = [ + name for name in zip_file.namelist() + if ( + any(name.endswith(ext) for ext in TEXT_FILE_TYPES) + and not name.startswith((".", "__MACOSX")) + ) + ] - # Return empty data if silent_errors is True - if not silent_errors: - raise + if not valid_files: + msg = f"No valid files in the zip archive: {zip_path.name}." + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) - data = Data() + for file_name in valid_files: + with NamedTemporaryFile(delete=False) as temp_file: + temp_path = Path(temp_file.name).with_name(file_name) + with zip_file.open(file_name) as file_content: + temp_path.write_bytes(file_content.read()) + unpacked_files.append(temp_path) - return data + return unpacked_files From e2f3f467264ae72f9c8def48c38e5f202435d809 Mon Sep 17 00:00:00 2001 From: Phil Miesle Date: Wed, 20 Nov 2024 09:40:45 +0000 Subject: [PATCH 06/20] shifting potentially common logic into BaseFileComponent --- .../base/langflow/base/data/__init__.py | 5 + .../base/langflow/base/data/base_file.py | 255 ++++++++++++++++++ .../base/langflow/components/data/file.py | 234 ++-------------- 3 files changed, 279 insertions(+), 215 deletions(-) create mode 100644 src/backend/base/langflow/base/data/base_file.py diff --git a/src/backend/base/langflow/base/data/__init__.py b/src/backend/base/langflow/base/data/__init__.py index e69de29bb2d..8a92e12b04e 100644 --- a/src/backend/base/langflow/base/data/__init__.py +++ b/src/backend/base/langflow/base/data/__init__.py @@ -0,0 +1,5 @@ +from .base_file import BaseFileComponent + +__all__ = [ + "BaseFileComponent", +] diff --git a/src/backend/base/langflow/base/data/base_file.py b/src/backend/base/langflow/base/data/base_file.py new file mode 100644 index 00000000000..0882a70e4f2 --- /dev/null +++ b/src/backend/base/langflow/base/data/base_file.py @@ -0,0 +1,255 @@ +from abc import abstractmethod, ABC +from pathlib import Path +from tempfile import NamedTemporaryFile +from typing import Callable +from zipfile import ZipFile, is_zipfile +import tarfile + +from langflow.custom import Component +from langflow.io import BoolInput, FileInput, HandleInput, Output +from langflow.schema import Data + +class BaseFileComponent(Component, ABC): + """Base class for handling file processing components. + + This class provides common functionality for resolving, validating, and + processing file paths. Child classes must define valid file extensions + and implement the `process_files` method. + """ + + # Subclasses can override these class variables + VALID_EXTENSIONS = [] # To be overridden by child classes + IGNORE_STARTS_WITH = [".", "__MACOSX"] + + SERVER_FILE_PATH_FIELDNAME = "file_path" + SUPPORTED_BUNDLE_EXTENSIONS = ["zip", "tar", "tgz", "bz2", "gz"] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # Dynamically update FileInput to include valid extensions and bundles + self._base_inputs[0].file_types = [*self.valid_extensions, *self.SUPPORTED_BUNDLE_EXTENSIONS] + + file_types = ", ".join(self.valid_extensions) + bundles = ", ".join(self.SUPPORTED_BUNDLE_EXTENSIONS) + self._base_inputs[0].info = ( + f"Supported file extensions: {file_types}; optionally bundled in file extensions: {bundles}" + ) + + _base_inputs = [ + FileInput( + name="path", + display_name="Path", + file_types=[], # Dynamically set in __init__ + info="", # Dynamically set in __init__ + required=False, + ), + HandleInput( + name="file_path", + display_name="Server File Path", + info=f"Data object with a '{SERVER_FILE_PATH_FIELDNAME}' property pointing to server file.", + required=False, + input_types=["Data"], + ), + BoolInput( + name="silent_errors", + display_name="Silent Errors", + advanced=True, + info="If true, errors will not raise an exception.", + ), + BoolInput( + name="delete_server_file_after_processing", + display_name="Delete Server File After Processing", + advanced=True, + value=True, + info="If true, the Server File Path will be deleted after processing.", + ), + ] + + _base_outputs = [ + Output(display_name="Data", name="data", method="load_files") + ] + + @abstractmethod + def process_files(self, file_list: list[Path]) -> list[Data]: + """Processes a list of files and returns parsed data. + + Args: + file_list (list[Path]): A list of file paths to be processed. + + Returns: + list[Data]: A list of parsed data objects from the processed files. + """ + pass + + def load_files(self) -> list[Data]: + """Loads and parses file(s), including unpacked file bundles. + + This method resolves file paths, validates extensions, and delegates + file processing to the `process_files` method. + + Returns: + list[Data]: Parsed data from the processed files. + + Raises: + ValueError: If no valid file is provided or file extensions are unsupported. + """ + resolved_paths = self._resolve_paths() + + valid_file_paths = [ + path for path, _ in resolved_paths if path.suffix[1:] in self.valid_extensions + ] + + processed_data = self.process_files(valid_file_paths) + + try: + return [data for data in processed_data if data] + finally: + for path, delete_after_processing in resolved_paths: + if delete_after_processing and path.exists(): + path.unlink() + + @property + def valid_extensions(self) -> list[str]: + """Returns valid file extensions for the class. + + This property can be overridden by child classes to provide specific + extensions. + + Returns: + list[str]: A list of valid file extensions without the leading dot. + """ + return self.VALID_EXTENSIONS + + @property + def ignore_starts_with(self) -> list[str]: + """Returns prefixes to ignore when unpacking file bundles. + + Returns: + list[str]: A list of prefixes to ignore when unpacking file bundles. + """ + return self.IGNORE_STARTS_WITH + + def _resolve_paths(self) -> list[tuple[Path, bool]]: + """Resolves file paths and validates extensions. + + Returns: + list[tuple[Path, bool]]: Resolved paths and whether they should be removed after processing. + + Raises: + ValueError: If paths contain unsupported file extensions. + """ + resolved_paths = [] + + def add_path(path: str, to_remove: bool): + resolved_path = Path(self.resolve_path(path)) + if resolved_path.suffix[1:] not in self.valid_extensions + self.SUPPORTED_BUNDLE_EXTENSIONS: + msg = f"Unsupported file type: {resolved_path.suffix}" + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) + else: + resolved_paths.append((resolved_path, to_remove)) + + # Add self.path if provided + if self.path: + add_path(self.path, False) + + # Add paths from file_path if provided + if self.file_path: + if isinstance(self.file_path, Data): + self.file_path = [self.file_path] + + if isinstance(self.file_path, list): + for obj in self.file_path: + if not isinstance(obj, Data): + msg = f"Unexpected type in file_path. Expected Data, got {type(obj)}." + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) + continue + + server_file_path = obj.data.get(self.SERVER_FILE_PATH_FIELDNAME) + if server_file_path: + add_path(server_file_path, self.delete_server_file_after_processing) + else: + msg = f"One of the Data objects is missing the `{self.SERVER_FILE_PATH_FIELDNAME}` property." + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) + else: + msg = f"Unexpected type in file_path. Expected list, got {type(self.file_path)}." + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) + + # Unpack file bundles + final_paths = [] + for path, to_remove in resolved_paths: + final_paths.append((path, to_remove)) + if path.suffix[1:] in self.SUPPORTED_BUNDLE_EXTENSIONS: + self.log(f"Unpacking file bundle: {path.name}.") + final_paths.extend((p, True) for p in self._unpack_file_bundle(path)) + + return final_paths + + def _unpack_file_bundle(self, bundle_path: Path) -> list[Path]: + """Unpacks a file bundle (zip, tar, tgz, etc.) and returns extracted file paths. + + Args: + bundle_path (Path): Path to the file bundle. + + Returns: + list[Path]: Paths to the extracted files. + + Raises: + ValueError: If the bundle contains no valid files or cannot be read. + """ + unpacked_files = [] + + def extract_files(list_files_callable: Callable, extract_file_callable: Callable): + """Helper to validate and extract files from the bundle.""" + valid_files = [ + file for file in list_files_callable() + if ( + any(file.endswith(f".{ext}") for ext in self.valid_extensions) + and not file.startswith(tuple(self.IGNORE_STARTS_WITH)) + ) + ] + + if not valid_files: + msg = f"No valid files in the bundle: {bundle_path.name}." + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) + + for file in valid_files: + with NamedTemporaryFile(delete=False) as temp_file: + temp_path = Path(temp_file.name).with_name(file) + file_content = extract_file_callable(file) + if file_content: + temp_path.write_bytes(file_content.read()) + unpacked_files.append(temp_path) + + if is_zipfile(bundle_path): + with ZipFile(bundle_path, "r") as bundle: + extract_files( + list_files_callable=bundle.namelist, + extract_file_callable=bundle.open, + ) + elif tarfile.is_tarfile(bundle_path): + with tarfile.open(bundle_path, "r:*") as bundle: + extract_files( + list_files_callable=lambda: [ + member.name for member in bundle.getmembers() if member.isfile() + ], + extract_file_callable=lambda member_name: bundle.extractfile( + next(m for m in bundle.getmembers() if m.name == member_name) + ), + ) + else: + msg = f"Unsupported bundle format: {bundle_path.suffix}" + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) + + return unpacked_files diff --git a/src/backend/base/langflow/components/data/file.py b/src/backend/base/langflow/components/data/file.py index 0ca062cb913..d3c5bffeb3a 100644 --- a/src/backend/base/langflow/components/data/file.py +++ b/src/backend/base/langflow/components/data/file.py @@ -1,55 +1,26 @@ from pathlib import Path -from tempfile import NamedTemporaryFile -from zipfile import ZipFile, is_zipfile +from langflow.base.data import BaseFileComponent from langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data -from langflow.custom import Component -from langflow.io import BoolInput, FileInput, IntInput, Output, HandleInput +from langflow.io import BoolInput, IntInput from langflow.schema import Data -class FileComponent(Component): +class FileComponent(BaseFileComponent): """Handles loading and processing of individual or zipped text files. This component supports processing multiple valid files within a zip archive, resolving paths, validating file types, and optionally using multithreading for processing. - - Attributes: - display_name (str): Display name of the component. - description (str): Brief description of the component. - icon (str): Icon representing the component in the UI. - name (str): Identifier for the component. - inputs (list): Inputs required by the component, including file paths and processing options. - outputs (list): Outputs of the component after processing files, returning parsed data. """ display_name = "File" description = "Load a file to be used in your project." icon = "file-text" name = "File" - SERVER_FILE_PATH_FIELDNAME = "file_path" + VALID_EXTENSIONS = TEXT_FILE_TYPES inputs = [ - FileInput( - name="path", - display_name="Path", - file_types=[*TEXT_FILE_TYPES, "zip"], - info=f"Supported file types: {', '.join([*TEXT_FILE_TYPES, 'zip'])}", - required=False, - ), - HandleInput( - name="file_path", - display_name="Server File Path", - info=f"Data object with a '{SERVER_FILE_PATH_FIELDNAME}' property pointing to server file.", - required=False, - input_types=["Data"], - ), - BoolInput( - name="silent_errors", - display_name="Silent Errors", - advanced=True, - info="If true, errors will not raise an exception.", - ), + *BaseFileComponent._base_inputs, BoolInput( name="use_multithreading", display_name="Use Multithreading", @@ -63,35 +34,28 @@ class FileComponent(Component): info="The maximum number of workers to use, if concurrency is enabled", value=4, ), - BoolInput( - name="delete_server_file_after_processing", - display_name="Delete Server File After Processing", - advanced=True, - value=True, - info="If true, the Server File Path will be deleted after processing.", - ), ] - outputs = [Output(display_name="Data", name="data", method="load_file")] + outputs = [ + *BaseFileComponent._base_outputs, + ] - def load_file(self) -> list[Data]: - """Loads and parses file(s), including unpacked zip files, with optional parallelism. + def process_files(self, file_list: list[Path]) -> list[Data]: + """Processes a list of individual files and returns parsed data. - This method processes files by resolving paths, validating file extensions, and optionally using - multithreading. Files within zip archives are unpacked and processed as individual files. + This method supports optional multithreading for improved performance + when processing multiple files. - Raises: - ValueError: If no valid file is provided, or if file extensions are unsupported. + Args: + file_list (list[Path]): A list of file paths to be processed. Returns: list[Data]: A list of parsed data objects from the processed files. """ - resolved_paths = self._resolve_paths() - def process_file(file_path: Path) -> Data: try: self.log(f"Processing file: {file_path.name}.") - return self._process_single_file(file_path) + return parse_text_file_to_data(str(file_path), silent_errors=self.silent_errors) except FileNotFoundError as e: msg = f"File not found: {file_path.name}. Error: {e}" self.log(msg) @@ -105,178 +69,18 @@ def process_file(file_path: Path) -> Data: raise e return None - valid_file_paths = [path for path, _ in resolved_paths if path.suffix in [f".{ext}" for ext in TEXT_FILE_TYPES]] - if not self.use_multithreading: self.log("Processing files sequentially.") - processed_data = [process_file(path) for path in valid_file_paths if path] + processed_data = [process_file(file) for file in file_list if file] else: self.log(f"Starting parallel processing with max workers: {self.concurrency_multithreading}.") processed_data = parallel_load_data( - valid_file_paths, + file_list, silent_errors=self.silent_errors, load_function=process_file, max_concurrency=self.concurrency_multithreading, ) - # Cleanup and filter results - try: - return [data for data in processed_data if data] - finally: - for path, delete_after_processing in resolved_paths: - if delete_after_processing and path.exists(): - self.log(f"Deleting file: {path.name}.") - path.unlink() - - def _process_single_file(self, file_path: Path) -> Data: - """Processes a single file and returns parsed data. - - This method reads the content of a file, validates its format, and parses it - into a `Data` object. - - Args: - file_path (Path): Path to the file to be processed. - - Returns: - Data: Parsed data from the file. - - Raises: - ValueError: If the file cannot be parsed or is unsupported. - """ - data = parse_text_file_to_data(str(file_path), silent_errors=self.silent_errors) - return data or Data() - - def _resolve_paths(self) -> list[tuple[Path, bool]]: - """Resolves a file path and validates its extension. - - This method checks whether the file extension is supported (matching TEXT_FILE_TYPES or 'zip'). - It resolves the provided path and logs errors for unsupported file types. - - Args: - path (str): The input file path to be resolved and validated. - - Returns: - Path | None: The resolved file path if valid, or None if the file type is unsupported and silent_errors is enabled. - - Raises: - ValueError: If the file type is unsupported and silent_errors is disabled. - """ - resolved_paths = [] - - def add_path(path: str, to_remove: bool): - resolved_path = Path(self.resolve_path(path)) - if resolved_path.suffix not in [f".{ext}" for ext in [*TEXT_FILE_TYPES, "zip"]]: - msg = f"Unsupported file type: {resolved_path.suffix}" - self.log(msg) - if not self.silent_errors: - raise ValueError(msg) - else: - resolved_paths.append((resolved_path, to_remove)) - - # Add self.path if provided; we do not delete these to preserve original behavior - if self.path: - add_path(self.path, False) - - # Add paths from file_path if provided - if self.file_path: - if isinstance(self.file_path, Data): - self.file_path = [self.file_path] - - if isinstance(self.file_path, list): - for obj in self.file_path: - if not isinstance(obj, Data): - msg = f"Unexpected type in file_path. Expected Data, got {type(obj)}." - self.log(msg) - if not self.silent_errors: - raise ValueError(msg) - continue - - server_file_path = obj.data.get(self.SERVER_FILE_PATH_FIELDNAME) - if server_file_path: - add_path(server_file_path, self.delete_server_file_after_processing) - else: - msg = f"One of the Data objects is missing the `{self.SERVER_FILE_PATH_FIELDNAME}` property." - self.log(msg) - if not self.silent_errors: - raise ValueError(msg) - else: - msg = f"Unexpected type in file_path. Expected list, got {type(self.file_path)}." - self.log(msg) - if not self.silent_errors: - raise ValueError(msg) - - # Unpack zip files and process valid extensions - final_paths = [] - for path, to_remove in resolved_paths: - final_paths.append((path, to_remove)) - if is_zipfile(path): - self.log(f"Unpacking zip file: {path.name}.") - # always remove zip file contents after processing - final_paths.extend((p, True) for p in self._unpack_zip_file(path)) - - return final_paths - - def _resolve_and_validate_path(self, path: str) -> Path | None: - """ - Resolves a given file path and validates its extension. - - Checks if the file extension is supported (matches either TEXT_FILE_TYPES or 'zip'). - Logs a message and optionally raises a ValueError if the file type is unsupported. - - Args: - path (str): The input file path to resolve and validate. - - Returns: - Path | None: The resolved file path if valid, or None if the file type is unsupported and silent_errors is enabled. - - Raises: - ValueError: If the file type is unsupported and silent_errors is False. - """ - resolved_path = Path(self.resolve_path(path)) - if not any(resolved_path.suffix == f".{ext}" for ext in [*TEXT_FILE_TYPES, '.zip']): - msg = f"Unsupported file type: {resolved_path.suffix}" - self.log(msg) - if not self.silent_errors: - raise ValueError(msg) - return None - return resolved_path - - def _unpack_zip_file(self, zip_path: Path) -> list[Path]: - """Unpacks a zip file and returns paths to its extracted files. - - This method extracts files from a zip archive, validating their extensions and ignoring - hidden or unsupported files. - - Args: - zip_path (Path): The path to the zip file to be unpacked. - - Returns: - list[Path]: A list of paths to valid files extracted from the zip archive. - - Raises: - ValueError: If the zip file contains no valid files or cannot be read. - """ - unpacked_files = [] - with ZipFile(zip_path, "r") as zip_file: - valid_files = [ - name for name in zip_file.namelist() - if ( - any(name.endswith(ext) for ext in TEXT_FILE_TYPES) - and not name.startswith((".", "__MACOSX")) - ) - ] - - if not valid_files: - msg = f"No valid files in the zip archive: {zip_path.name}." - self.log(msg) - if not self.silent_errors: - raise ValueError(msg) - - for file_name in valid_files: - with NamedTemporaryFile(delete=False) as temp_file: - temp_path = Path(temp_file.name).with_name(file_name) - with zip_file.open(file_name) as file_content: - temp_path.write_bytes(file_content.read()) - unpacked_files.append(temp_path) + # Filter out empty results and return + return [data for data in processed_data if data] - return unpacked_files From 3cdc952b93fd028eb1e45c83217a6fec27150414 Mon Sep 17 00:00:00 2001 From: Phil Miesle Date: Wed, 20 Nov 2024 14:03:15 +0000 Subject: [PATCH 07/20] improving readability and fixing problems --- .../base/langflow/base/data/base_file.py | 254 +++++++++++------- .../base/langflow/components/data/file.py | 34 +-- 2 files changed, 173 insertions(+), 115 deletions(-) diff --git a/src/backend/base/langflow/base/data/base_file.py b/src/backend/base/langflow/base/data/base_file.py index 0882a70e4f2..a7432464fbe 100644 --- a/src/backend/base/langflow/base/data/base_file.py +++ b/src/backend/base/langflow/base/data/base_file.py @@ -1,9 +1,9 @@ from abc import abstractmethod, ABC from pathlib import Path -from tempfile import NamedTemporaryFile -from typing import Callable +from tempfile import TemporaryDirectory from zipfile import ZipFile, is_zipfile import tarfile +import shutil from langflow.custom import Component from langflow.io import BoolInput, FileInput, HandleInput, Output @@ -46,7 +46,10 @@ def __init__(self, *args, **kwargs): HandleInput( name="file_path", display_name="Server File Path", - info=f"Data object with a '{SERVER_FILE_PATH_FIELDNAME}' property pointing to server file.", + info=( + f"Data object with a '{SERVER_FILE_PATH_FIELDNAME}' property pointing to server file. " + "Supercedes 'Path'. " + ), required=False, input_types=["Data"], ), @@ -63,6 +66,13 @@ def __init__(self, *args, **kwargs): value=True, info="If true, the Server File Path will be deleted after processing.", ), + BoolInput( + name="ignore_unsupported_extensions", + display_name="Ignore Unsupported Extensions", + advanced=True, + value=True, + info="If true, files with unsupported extensions will not be processed.", + ), ] _base_outputs = [ @@ -93,20 +103,45 @@ def load_files(self) -> list[Data]: Raises: ValueError: If no valid file is provided or file extensions are unsupported. """ - resolved_paths = self._resolve_paths() + # List to keep track of temporary directories + self._temp_dirs = [] + final_files_with_flags = [] + try: + # Step 1: Validate the provided paths + paths_with_flags = self._validate_and_resolve_paths() - valid_file_paths = [ - path for path, _ in resolved_paths if path.suffix[1:] in self.valid_extensions - ] + # self.log(f"paths_with_flags: {paths_with_flags}") - processed_data = self.process_files(valid_file_paths) + # Step 2: Handle bundles recursively + all_files_with_flags = self._unpack_and_collect_files(paths_with_flags) + + # self.log(f"all_files_with_flags: {all_files_with_flags}") + + # Step 3: Final validation of file types and remove-after-processing markers + final_files_with_flags = self._filter_and_mark_files(all_files_with_flags) + + # self.log(f"final_files_with_flags: {final_files_with_flags}") + + # Extract just the paths for processing + valid_file_paths = [path for path, _ in final_files_with_flags] + + # self.log(f"valid_file_paths: {valid_file_paths}") + + processed_data = self.process_files(valid_file_paths) - try: return [data for data in processed_data if data] finally: - for path, delete_after_processing in resolved_paths: + # Delete temporary directories + for temp_dir in self._temp_dirs: + temp_dir.cleanup() + + # Delete files marked for deletion + for path, delete_after_processing in final_files_with_flags: if delete_after_processing and path.exists(): - path.unlink() + if path.is_dir(): + shutil.rmtree(path) + else: + path.unlink() @property def valid_extensions(self) -> list[str]: @@ -129,127 +164,146 @@ def ignore_starts_with(self) -> list[str]: """ return self.IGNORE_STARTS_WITH - def _resolve_paths(self) -> list[tuple[Path, bool]]: - """Resolves file paths and validates extensions. + def _validate_and_resolve_paths(self) -> list[tuple[Path, bool]]: + """Validate that all input paths exist and are valid. Returns: - list[tuple[Path, bool]]: Resolved paths and whether they should be removed after processing. + list[tuple[Path, bool]]: A list of valid paths and whether they should be deleted after processing. Raises: - ValueError: If paths contain unsupported file extensions. + ValueError: If any path does not exist. """ resolved_paths = [] - def add_path(path: str, to_remove: bool): + def add_path(path: str, *, delete_after_processing: bool): resolved_path = Path(self.resolve_path(path)) - if resolved_path.suffix[1:] not in self.valid_extensions + self.SUPPORTED_BUNDLE_EXTENSIONS: - msg = f"Unsupported file type: {resolved_path.suffix}" + if not resolved_path.exists(): + msg = f"File or directory not found: {path}" self.log(msg) if not self.silent_errors: raise ValueError(msg) - else: - resolved_paths.append((resolved_path, to_remove)) - - # Add self.path if provided - if self.path: - add_path(self.path, False) + resolved_paths.append((resolved_path, delete_after_processing)) - # Add paths from file_path if provided - if self.file_path: + if self.path and not self.file_path: # Only process self.path if file_path is not provided + add_path(self.path, delete_after_processing=False) # Files from self.path are never deleted + elif self.file_path: if isinstance(self.file_path, Data): self.file_path = [self.file_path] - if isinstance(self.file_path, list): - for obj in self.file_path: - if not isinstance(obj, Data): - msg = f"Unexpected type in file_path. Expected Data, got {type(obj)}." - self.log(msg) - if not self.silent_errors: - raise ValueError(msg) - continue - - server_file_path = obj.data.get(self.SERVER_FILE_PATH_FIELDNAME) - if server_file_path: - add_path(server_file_path, self.delete_server_file_after_processing) - else: - msg = f"One of the Data objects is missing the `{self.SERVER_FILE_PATH_FIELDNAME}` property." - self.log(msg) - if not self.silent_errors: - raise ValueError(msg) - else: - msg = f"Unexpected type in file_path. Expected list, got {type(self.file_path)}." - self.log(msg) - if not self.silent_errors: - raise ValueError(msg) + for obj in self.file_path: + if not isinstance(obj, Data): + msg = f"Expected Data object in file_path but got {type(obj)}." + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) + continue - # Unpack file bundles - final_paths = [] - for path, to_remove in resolved_paths: - final_paths.append((path, to_remove)) - if path.suffix[1:] in self.SUPPORTED_BUNDLE_EXTENSIONS: - self.log(f"Unpacking file bundle: {path.name}.") - final_paths.extend((p, True) for p in self._unpack_file_bundle(path)) + server_file_path = obj.data.get(self.SERVER_FILE_PATH_FIELDNAME) + if server_file_path: + add_path(server_file_path, delete_after_processing=self.delete_server_file_after_processing) + else: + msg = f"Data object missing '{self.SERVER_FILE_PATH_FIELDNAME}' property." + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) - return final_paths + return resolved_paths - def _unpack_file_bundle(self, bundle_path: Path) -> list[Path]: - """Unpacks a file bundle (zip, tar, tgz, etc.) and returns extracted file paths. + def _unpack_and_collect_files(self, paths_with_flags: list[tuple[Path, bool]]) -> list[tuple[Path, bool]]: + """Recursively unpack bundles and collect files. Args: - bundle_path (Path): Path to the file bundle. + paths_with_flags (list[tuple[Path, bool]]): List of input paths and their delete-after-processing flags. Returns: - list[Path]: Paths to the extracted files. - - Raises: - ValueError: If the bundle contains no valid files or cannot be read. + list[tuple[Path, bool]]: List of all files after unpacking bundles, along with their delete-after-processing flags. """ - unpacked_files = [] - - def extract_files(list_files_callable: Callable, extract_file_callable: Callable): - """Helper to validate and extract files from the bundle.""" - valid_files = [ - file for file in list_files_callable() - if ( - any(file.endswith(f".{ext}") for ext in self.valid_extensions) - and not file.startswith(tuple(self.IGNORE_STARTS_WITH)) - ) - ] - - if not valid_files: - msg = f"No valid files in the bundle: {bundle_path.name}." - self.log(msg) - if not self.silent_errors: - raise ValueError(msg) + collected_files_with_flags = [] + + for path, delete_after_processing in paths_with_flags: + if path.is_dir(): + # Recurse into directories + for sub_path in path.rglob('*'): # Use rglob to recursively find all files and directories + if sub_path.is_file(): # Only add files + collected_files_with_flags.append((sub_path, delete_after_processing)) + elif path.suffix[1:] in self.SUPPORTED_BUNDLE_EXTENSIONS: + # Unpack supported bundles + temp_dir = TemporaryDirectory() + self._temp_dirs.append(temp_dir) + temp_dir_path = Path(temp_dir.name) + self._unpack_bundle(path, temp_dir_path) + subpaths = list(temp_dir_path.iterdir()) + self.log(f"Unpacked bundle {path.name} into {subpaths}") + for sub_path in subpaths: + if sub_path.is_dir(): + # Add directory to process its contents later + collected_files_with_flags.append((sub_path, delete_after_processing)) + else: + collected_files_with_flags.append((sub_path, delete_after_processing)) + else: + collected_files_with_flags.append((path, delete_after_processing)) + + # Recurse again if any directories or bundles are left in the list + if any(file.is_dir() or file.suffix[1:] in self.SUPPORTED_BUNDLE_EXTENSIONS for file, _ in collected_files_with_flags): + return self._unpack_and_collect_files(collected_files_with_flags) + + return collected_files_with_flags + + def _unpack_bundle(self, bundle_path: Path, output_dir: Path): + """Unpack a bundle into a temporary directory. - for file in valid_files: - with NamedTemporaryFile(delete=False) as temp_file: - temp_path = Path(temp_file.name).with_name(file) - file_content = extract_file_callable(file) - if file_content: - temp_path.write_bytes(file_content.read()) - unpacked_files.append(temp_path) + Args: + bundle_path (Path): Path to the bundle. + output_dir (Path): Directory where files will be extracted. + Raises: + ValueError: If the bundle format is unsupported or cannot be read. + """ if is_zipfile(bundle_path): with ZipFile(bundle_path, "r") as bundle: - extract_files( - list_files_callable=bundle.namelist, - extract_file_callable=bundle.open, - ) + bundle.extractall(output_dir) elif tarfile.is_tarfile(bundle_path): with tarfile.open(bundle_path, "r:*") as bundle: - extract_files( - list_files_callable=lambda: [ - member.name for member in bundle.getmembers() if member.isfile() - ], - extract_file_callable=lambda member_name: bundle.extractfile( - next(m for m in bundle.getmembers() if m.name == member_name) - ), - ) + bundle.extractall(output_dir) else: msg = f"Unsupported bundle format: {bundle_path.suffix}" self.log(msg) if not self.silent_errors: raise ValueError(msg) - return unpacked_files + def _filter_and_mark_files(self, files_with_flags: list[tuple[Path, bool]]) -> list[tuple[Path, bool]]: + """Validate file types and mark files for removal. + + Args: + files_with_flags (list[tuple[Path, bool]]): List of files and their delete-after-processing flags. + + Returns: + list[tuple[Path, bool]]: Validated files with their remove-after-processing markers. + + Raises: + ValueError: If unsupported files are encountered and `ignore_unsupported_extensions` is False. + """ + final_files_with_flags = [] + ignored_files = [] + + for file, delete_after_processing in files_with_flags: + if not file.is_file(): + self.log(f"Not a file: {file.name}") + continue + + if file.suffix[1:] not in self.valid_extensions: + if self.ignore_unsupported_extensions: + ignored_files.append(file.name) + continue + else: + msg = f"Unsupported file extension: {file.suffix}" + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) + + final_files_with_flags.append((file, delete_after_processing)) + + if ignored_files: + self.log(f"Ignored files: {ignored_files}") + + return final_files_with_flags diff --git a/src/backend/base/langflow/components/data/file.py b/src/backend/base/langflow/components/data/file.py index d3c5bffeb3a..dbe1e8c8fbe 100644 --- a/src/backend/base/langflow/components/data/file.py +++ b/src/backend/base/langflow/components/data/file.py @@ -23,16 +23,17 @@ class FileComponent(BaseFileComponent): *BaseFileComponent._base_inputs, BoolInput( name="use_multithreading", - display_name="Use Multithreading", + display_name="[Deprecated] Use Multithreading", advanced=True, - info="If true, parallel processing will be enabled for zip files.", + value=True, + info="Set 'Processing Concurrency' greater than 1 to enable multithreading.", ), IntInput( name="concurrency_multithreading", - display_name="Multithreading Concurrency", - advanced=True, - info="The maximum number of workers to use, if concurrency is enabled", - value=4, + display_name="Processing Concurrency", + advanced=False, + info="When multiple files are being processed, the number of files to process concurrently.", + value=1, ), ] @@ -52,33 +53,36 @@ def process_files(self, file_list: list[Path]) -> list[Data]: Returns: list[Data]: A list of parsed data objects from the processed files. """ - def process_file(file_path: Path) -> Data: + def process_file(file_path: Path, *, silent_errors: bool = False) -> Data: try: - self.log(f"Processing file: {file_path.name}.") - return parse_text_file_to_data(str(file_path), silent_errors=self.silent_errors) + return parse_text_file_to_data(str(file_path), silent_errors=silent_errors) except FileNotFoundError as e: msg = f"File not found: {file_path.name}. Error: {e}" self.log(msg) - if not self.silent_errors: + if not silent_errors: raise e return None except Exception as e: msg = f"Unexpected error processing {file_path.name}: {e}" self.log(msg) - if not self.silent_errors: + if not silent_errors: raise e return None + + concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading) + file_count = len(file_list) - if not self.use_multithreading: - self.log("Processing files sequentially.") + if concurrency < 2 or file_count < 2: + if file_count > 1: + self.log(f"Processing {file_count} files sequentially.") processed_data = [process_file(file) for file in file_list if file] else: - self.log(f"Starting parallel processing with max workers: {self.concurrency_multithreading}.") + self.log(f"Starting parallel processing of {file_count} files with concurrency: {concurrency}.") processed_data = parallel_load_data( file_list, silent_errors=self.silent_errors, load_function=process_file, - max_concurrency=self.concurrency_multithreading, + max_concurrency=concurrency, ) # Filter out empty results and return From 557f91959cd94f843311458835cb3928d63c7016 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 20 Nov 2024 14:09:46 +0000 Subject: [PATCH 08/20] [autofix.ci] apply automated fixes --- .../base/langflow/base/data/base_file.py | 44 +++++++++---------- .../base/langflow/components/data/file.py | 9 ++-- 2 files changed, 27 insertions(+), 26 deletions(-) diff --git a/src/backend/base/langflow/base/data/base_file.py b/src/backend/base/langflow/base/data/base_file.py index a7432464fbe..def5fb3f7e9 100644 --- a/src/backend/base/langflow/base/data/base_file.py +++ b/src/backend/base/langflow/base/data/base_file.py @@ -1,14 +1,15 @@ -from abc import abstractmethod, ABC +import shutil +import tarfile +from abc import ABC, abstractmethod from pathlib import Path from tempfile import TemporaryDirectory from zipfile import ZipFile, is_zipfile -import tarfile -import shutil from langflow.custom import Component from langflow.io import BoolInput, FileInput, HandleInput, Output from langflow.schema import Data + class BaseFileComponent(Component, ABC): """Base class for handling file processing components. @@ -28,27 +29,27 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Dynamically update FileInput to include valid extensions and bundles self._base_inputs[0].file_types = [*self.valid_extensions, *self.SUPPORTED_BUNDLE_EXTENSIONS] - + file_types = ", ".join(self.valid_extensions) bundles = ", ".join(self.SUPPORTED_BUNDLE_EXTENSIONS) - self._base_inputs[0].info = ( - f"Supported file extensions: {file_types}; optionally bundled in file extensions: {bundles}" - ) + self._base_inputs[ + 0 + ].info = f"Supported file extensions: {file_types}; optionally bundled in file extensions: {bundles}" _base_inputs = [ FileInput( name="path", display_name="Path", file_types=[], # Dynamically set in __init__ - info="", # Dynamically set in __init__ + info="", # Dynamically set in __init__ required=False, ), HandleInput( name="file_path", display_name="Server File Path", info=( - f"Data object with a '{SERVER_FILE_PATH_FIELDNAME}' property pointing to server file. " - "Supercedes 'Path'. " + f"Data object with a '{SERVER_FILE_PATH_FIELDNAME}' property pointing to server file. " + "Supercedes 'Path'. " ), required=False, input_types=["Data"], @@ -75,9 +76,7 @@ def __init__(self, *args, **kwargs): ), ] - _base_outputs = [ - Output(display_name="Data", name="data", method="load_files") - ] + _base_outputs = [Output(display_name="Data", name="data", method="load_files")] @abstractmethod def process_files(self, file_list: list[Path]) -> list[Data]: @@ -89,7 +88,6 @@ def process_files(self, file_list: list[Path]) -> list[Data]: Returns: list[Data]: A list of parsed data objects from the processed files. """ - pass def load_files(self) -> list[Data]: """Loads and parses file(s), including unpacked file bundles. @@ -184,7 +182,7 @@ def add_path(path: str, *, delete_after_processing: bool): raise ValueError(msg) resolved_paths.append((resolved_path, delete_after_processing)) - if self.path and not self.file_path: # Only process self.path if file_path is not provided + if self.path and not self.file_path: # Only process self.path if file_path is not provided add_path(self.path, delete_after_processing=False) # Files from self.path are never deleted elif self.file_path: if isinstance(self.file_path, Data): @@ -223,7 +221,7 @@ def _unpack_and_collect_files(self, paths_with_flags: list[tuple[Path, bool]]) - for path, delete_after_processing in paths_with_flags: if path.is_dir(): # Recurse into directories - for sub_path in path.rglob('*'): # Use rglob to recursively find all files and directories + for sub_path in path.rglob("*"): # Use rglob to recursively find all files and directories if sub_path.is_file(): # Only add files collected_files_with_flags.append((sub_path, delete_after_processing)) elif path.suffix[1:] in self.SUPPORTED_BUNDLE_EXTENSIONS: @@ -244,7 +242,10 @@ def _unpack_and_collect_files(self, paths_with_flags: list[tuple[Path, bool]]) - collected_files_with_flags.append((path, delete_after_processing)) # Recurse again if any directories or bundles are left in the list - if any(file.is_dir() or file.suffix[1:] in self.SUPPORTED_BUNDLE_EXTENSIONS for file, _ in collected_files_with_flags): + if any( + file.is_dir() or file.suffix[1:] in self.SUPPORTED_BUNDLE_EXTENSIONS + for file, _ in collected_files_with_flags + ): return self._unpack_and_collect_files(collected_files_with_flags) return collected_files_with_flags @@ -295,11 +296,10 @@ def _filter_and_mark_files(self, files_with_flags: list[tuple[Path, bool]]) -> l if self.ignore_unsupported_extensions: ignored_files.append(file.name) continue - else: - msg = f"Unsupported file extension: {file.suffix}" - self.log(msg) - if not self.silent_errors: - raise ValueError(msg) + msg = f"Unsupported file extension: {file.suffix}" + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) final_files_with_flags.append((file, delete_after_processing)) diff --git a/src/backend/base/langflow/components/data/file.py b/src/backend/base/langflow/components/data/file.py index dbe1e8c8fbe..711bae86ca5 100644 --- a/src/backend/base/langflow/components/data/file.py +++ b/src/backend/base/langflow/components/data/file.py @@ -9,9 +9,10 @@ class FileComponent(BaseFileComponent): """Handles loading and processing of individual or zipped text files. - This component supports processing multiple valid files within a zip archive, + This component supports processing multiple valid files within a zip archive, resolving paths, validating file types, and optionally using multithreading for processing. """ + display_name = "File" description = "Load a file to be used in your project." icon = "file-text" @@ -53,6 +54,7 @@ def process_files(self, file_list: list[Path]) -> list[Data]: Returns: list[Data]: A list of parsed data objects from the processed files. """ + def process_file(file_path: Path, *, silent_errors: bool = False) -> Data: try: return parse_text_file_to_data(str(file_path), silent_errors=silent_errors) @@ -68,12 +70,12 @@ def process_file(file_path: Path, *, silent_errors: bool = False) -> Data: if not silent_errors: raise e return None - + concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading) file_count = len(file_list) if concurrency < 2 or file_count < 2: - if file_count > 1: + if file_count > 1: self.log(f"Processing {file_count} files sequentially.") processed_data = [process_file(file) for file in file_list if file] else: @@ -87,4 +89,3 @@ def process_file(file_path: Path, *, silent_errors: bool = False) -> Data: # Filter out empty results and return return [data for data in processed_data if data] - From 7bde19d9ec6fe811d431afbfbfec00c0dbb3e3db Mon Sep 17 00:00:00 2001 From: Phil Miesle Date: Wed, 20 Nov 2024 14:26:31 +0000 Subject: [PATCH 09/20] addressing linting --- .../base/langflow/base/data/base_file.py | 50 ++++++++++++++++--- .../base/langflow/components/data/file.py | 7 +-- 2 files changed, 48 insertions(+), 9 deletions(-) diff --git a/src/backend/base/langflow/base/data/base_file.py b/src/backend/base/langflow/base/data/base_file.py index def5fb3f7e9..dd9296a6347 100644 --- a/src/backend/base/langflow/base/data/base_file.py +++ b/src/backend/base/langflow/base/data/base_file.py @@ -1,8 +1,10 @@ +import os import shutil import tarfile from abc import ABC, abstractmethod from pathlib import Path from tempfile import TemporaryDirectory +from typing import Callable from zipfile import ZipFile, is_zipfile from langflow.custom import Component @@ -214,16 +216,17 @@ def _unpack_and_collect_files(self, paths_with_flags: list[tuple[Path, bool]]) - paths_with_flags (list[tuple[Path, bool]]): List of input paths and their delete-after-processing flags. Returns: - list[tuple[Path, bool]]: List of all files after unpacking bundles, along with their delete-after-processing flags. + list[tuple[Path, bool]]: + List of all files after unpacking bundles, along with their delete-after-processing flags. """ collected_files_with_flags = [] for path, delete_after_processing in paths_with_flags: if path.is_dir(): # Recurse into directories - for sub_path in path.rglob("*"): # Use rglob to recursively find all files and directories - if sub_path.is_file(): # Only add files - collected_files_with_flags.append((sub_path, delete_after_processing)) + collected_files_with_flags.extend( + [(sub_path, delete_after_processing) for sub_path in path.rglob("*") if sub_path.is_file()] + ) elif path.suffix[1:] in self.SUPPORTED_BUNDLE_EXTENSIONS: # Unpack supported bundles temp_dir = TemporaryDirectory() @@ -250,6 +253,31 @@ def _unpack_and_collect_files(self, paths_with_flags: list[tuple[Path, bool]]) - return collected_files_with_flags + def _safe_extract( + self, + extract_func: Callable[[Path], None], + members: list[str], + output_dir: Path, + archive_type: str, + ): + """Safely extract files from an archive, ensuring no path traversal. + + Args: + extract_func (Callable): Function to perform the extraction. + members (list[str]): List of members (file paths) to extract. + output_dir (Path): Directory where files will be extracted. + archive_type (str): Type of archive (ZIP or TAR) for logging. + + Raises: + ValueError: If an attempted path traversal is detected. + """ + path = str(output_dir) + for member in members: + member_path = os.path.join(path, member) + if not os.path.commonpath([path, member_path]).startswith(path): + raise ValueError(f"Attempted Path Traversal in {archive_type} File: {member}") + extract_func(output_dir) + def _unpack_bundle(self, bundle_path: Path, output_dir: Path): """Unpack a bundle into a temporary directory. @@ -262,10 +290,20 @@ def _unpack_bundle(self, bundle_path: Path, output_dir: Path): """ if is_zipfile(bundle_path): with ZipFile(bundle_path, "r") as bundle: - bundle.extractall(output_dir) + self._safe_extract( + bundle.extractall, + bundle.namelist(), + output_dir, + "ZIP", + ) elif tarfile.is_tarfile(bundle_path): with tarfile.open(bundle_path, "r:*") as bundle: - bundle.extractall(output_dir) + self._safe_extract( + lambda output_dir: bundle.extractall(output_dir), + [member.name for member in bundle.getmembers()], + output_dir, + "TAR", + ) else: msg = f"Unsupported bundle format: {bundle_path.suffix}" self.log(msg) diff --git a/src/backend/base/langflow/components/data/file.py b/src/backend/base/langflow/components/data/file.py index 711bae86ca5..619f15f5f9e 100644 --- a/src/backend/base/langflow/components/data/file.py +++ b/src/backend/base/langflow/components/data/file.py @@ -62,19 +62,20 @@ def process_file(file_path: Path, *, silent_errors: bool = False) -> Data: msg = f"File not found: {file_path.name}. Error: {e}" self.log(msg) if not silent_errors: - raise e + raise return None except Exception as e: msg = f"Unexpected error processing {file_path.name}: {e}" self.log(msg) if not silent_errors: - raise e + raise return None concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading) file_count = len(file_list) - if concurrency < 2 or file_count < 2: + PARALLEL_PROCESSING_THRESHOLD = 2 + if concurrency < PARALLEL_PROCESSING_THRESHOLD or file_count < PARALLEL_PROCESSING_THRESHOLD: if file_count > 1: self.log(f"Processing {file_count} files sequentially.") processed_data = [process_file(file) for file in file_list if file] From 816c715bd337c65afa17dc4427fa08b0ffc4c909 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 20 Nov 2024 14:27:29 +0000 Subject: [PATCH 10/20] [autofix.ci] apply automated fixes --- src/backend/base/langflow/base/data/base_file.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/backend/base/langflow/base/data/base_file.py b/src/backend/base/langflow/base/data/base_file.py index dd9296a6347..bb36f10f49b 100644 --- a/src/backend/base/langflow/base/data/base_file.py +++ b/src/backend/base/langflow/base/data/base_file.py @@ -2,9 +2,9 @@ import shutil import tarfile from abc import ABC, abstractmethod +from collections.abc import Callable from pathlib import Path from tempfile import TemporaryDirectory -from typing import Callable from zipfile import ZipFile, is_zipfile from langflow.custom import Component @@ -216,7 +216,7 @@ def _unpack_and_collect_files(self, paths_with_flags: list[tuple[Path, bool]]) - paths_with_flags (list[tuple[Path, bool]]): List of input paths and their delete-after-processing flags. Returns: - list[tuple[Path, bool]]: + list[tuple[Path, bool]]: List of all files after unpacking bundles, along with their delete-after-processing flags. """ collected_files_with_flags = [] From 9d1675b41318f1d3fe4891d6c35e4d6b17dff153 Mon Sep 17 00:00:00 2001 From: Phil Miesle Date: Wed, 20 Nov 2024 14:52:34 +0000 Subject: [PATCH 11/20] linting part 2 --- src/backend/base/langflow/base/data/base_file.py | 11 +++++------ src/backend/base/langflow/components/data/file.py | 4 ++-- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/backend/base/langflow/base/data/base_file.py b/src/backend/base/langflow/base/data/base_file.py index bb36f10f49b..8fe5215eaad 100644 --- a/src/backend/base/langflow/base/data/base_file.py +++ b/src/backend/base/langflow/base/data/base_file.py @@ -271,12 +271,11 @@ def _safe_extract( Raises: ValueError: If an attempted path traversal is detected. """ - path = str(output_dir) for member in members: - member_path = os.path.join(path, member) - if not os.path.commonpath([path, member_path]).startswith(path): + member_path = output_dir / member + if not member_path.resolve().is_relative_to(output_dir.resolve()): raise ValueError(f"Attempted Path Traversal in {archive_type} File: {member}") - extract_func(output_dir) + extract_func(output_dir, member) def _unpack_bundle(self, bundle_path: Path, output_dir: Path): """Unpack a bundle into a temporary directory. @@ -291,7 +290,7 @@ def _unpack_bundle(self, bundle_path: Path, output_dir: Path): if is_zipfile(bundle_path): with ZipFile(bundle_path, "r") as bundle: self._safe_extract( - bundle.extractall, + lambda output_dir, member: bundle.extract(member, path=output_dir), bundle.namelist(), output_dir, "ZIP", @@ -299,7 +298,7 @@ def _unpack_bundle(self, bundle_path: Path, output_dir: Path): elif tarfile.is_tarfile(bundle_path): with tarfile.open(bundle_path, "r:*") as bundle: self._safe_extract( - lambda output_dir: bundle.extractall(output_dir), + lambda output_dir, member: bundle.extract(member, path=output_dir), [member.name for member in bundle.getmembers()], output_dir, "TAR", diff --git a/src/backend/base/langflow/components/data/file.py b/src/backend/base/langflow/components/data/file.py index 619f15f5f9e..08b1f4a1239 100644 --- a/src/backend/base/langflow/components/data/file.py +++ b/src/backend/base/langflow/components/data/file.py @@ -74,8 +74,8 @@ def process_file(file_path: Path, *, silent_errors: bool = False) -> Data: concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading) file_count = len(file_list) - PARALLEL_PROCESSING_THRESHOLD = 2 - if concurrency < PARALLEL_PROCESSING_THRESHOLD or file_count < PARALLEL_PROCESSING_THRESHOLD: + parallel_processing_threshold = 2 + if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold: if file_count > 1: self.log(f"Processing {file_count} files sequentially.") processed_data = [process_file(file) for file in file_list if file] From c727ef0eb78feb6df3c036387326a61b8abe70eb Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 20 Nov 2024 14:53:25 +0000 Subject: [PATCH 12/20] [autofix.ci] apply automated fixes --- src/backend/base/langflow/base/data/base_file.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/backend/base/langflow/base/data/base_file.py b/src/backend/base/langflow/base/data/base_file.py index 8fe5215eaad..9cf3cedfb24 100644 --- a/src/backend/base/langflow/base/data/base_file.py +++ b/src/backend/base/langflow/base/data/base_file.py @@ -1,4 +1,3 @@ -import os import shutil import tarfile from abc import ABC, abstractmethod From becf61ae34a3b75167c531db6c3f90db5cc55739 Mon Sep 17 00:00:00 2001 From: Phil Miesle Date: Wed, 20 Nov 2024 14:57:06 +0000 Subject: [PATCH 13/20] linting part 3 --- src/backend/base/langflow/base/data/base_file.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/backend/base/langflow/base/data/base_file.py b/src/backend/base/langflow/base/data/base_file.py index 9cf3cedfb24..a7a5bd6e308 100644 --- a/src/backend/base/langflow/base/data/base_file.py +++ b/src/backend/base/langflow/base/data/base_file.py @@ -273,7 +273,8 @@ def _safe_extract( for member in members: member_path = output_dir / member if not member_path.resolve().is_relative_to(output_dir.resolve()): - raise ValueError(f"Attempted Path Traversal in {archive_type} File: {member}") + msg = f"Attempted Path Traversal in {archive_type} File: {member}" + raise ValueError(msg) extract_func(output_dir, member) def _unpack_bundle(self, bundle_path: Path, output_dir: Path): From 849d12a83902bf5e827b855b5b7503ba7b2d27e0 Mon Sep 17 00:00:00 2001 From: Phil Miesle Date: Thu, 21 Nov 2024 14:26:07 +0000 Subject: [PATCH 14/20] preserve input fields on data objects --- .../base/langflow/base/data/base_file.py | 177 ++++++++++-------- .../base/langflow/components/data/file.py | 42 ++--- 2 files changed, 124 insertions(+), 95 deletions(-) diff --git a/src/backend/base/langflow/base/data/base_file.py b/src/backend/base/langflow/base/data/base_file.py index a7a5bd6e308..92ee9750c55 100644 --- a/src/backend/base/langflow/base/data/base_file.py +++ b/src/backend/base/langflow/base/data/base_file.py @@ -19,6 +19,37 @@ class BaseFileComponent(Component, ABC): and implement the `process_files` method. """ + class BaseFile(): + """Internal class to represent a file with additional metadata.""" + def __init__(self, data: Data, path: Path, *, delete_after_processing: bool = False): + self.data = data + self.path = path + self.delete_after_processing = delete_after_processing + + def merge_data(self, new_data: Data | None) -> Data: + """Merges new data into the existing data object, handling None safely. + + Args: + new_data (Data | None): The new Data object to merge. If None, no changes are made. + + Returns: + Data: The merged data object. + """ + if new_data is not None: + self.data = Data(data={**self.data.data, **new_data.data}) + return self.data + + def __str__(self): + max_text_length = 50 + text_preview = self.data.get_text()[:max_text_length] + if len(self.data.get_text()) > max_text_length: + text_preview += "..." + return ( + f"BaseFile(path={self.path}, " + f"delete_after_processing={self.delete_after_processing}, " + f"text_preview='{text_preview}')" + ) + # Subclasses can override these class variables VALID_EXTENSIONS = [] # To be overridden by child classes IGNORE_STARTS_WITH = [".", "__MACOSX"] @@ -80,67 +111,52 @@ def __init__(self, *args, **kwargs): _base_outputs = [Output(display_name="Data", name="data", method="load_files")] @abstractmethod - def process_files(self, file_list: list[Path]) -> list[Data]: - """Processes a list of files and returns parsed data. + def process_files(self, file_list: list[BaseFile]) -> list[BaseFile]: + """Processes a list of files. Args: - file_list (list[Path]): A list of file paths to be processed. + file_list (list[BaseFile]): A list of file objects. Returns: - list[Data]: A list of parsed data objects from the processed files. + list[BaseFile]: A list of BaseFile objects with updated `data`. """ def load_files(self) -> list[Data]: """Loads and parses file(s), including unpacked file bundles. - This method resolves file paths, validates extensions, and delegates - file processing to the `process_files` method. - Returns: list[Data]: Parsed data from the processed files. - - Raises: - ValueError: If no valid file is provided or file extensions are unsupported. """ - # List to keep track of temporary directories self._temp_dirs = [] - final_files_with_flags = [] try: # Step 1: Validate the provided paths - paths_with_flags = self._validate_and_resolve_paths() - - # self.log(f"paths_with_flags: {paths_with_flags}") + files = self._validate_and_resolve_paths() # Step 2: Handle bundles recursively - all_files_with_flags = self._unpack_and_collect_files(paths_with_flags) - - # self.log(f"all_files_with_flags: {all_files_with_flags}") - - # Step 3: Final validation of file types and remove-after-processing markers - final_files_with_flags = self._filter_and_mark_files(all_files_with_flags) + all_files = self._unpack_and_collect_files(files) - # self.log(f"final_files_with_flags: {final_files_with_flags}") + # Step 3: Final validation of file types + final_files = self._filter_and_mark_files(all_files) - # Extract just the paths for processing - valid_file_paths = [path for path, _ in final_files_with_flags] + # Step 4: Process files + processed_files = self.process_files(final_files) - # self.log(f"valid_file_paths: {valid_file_paths}") + # Extract Data objects to return + processed_data = [file.data for file in processed_files if file.data] - processed_data = self.process_files(valid_file_paths) - - return [data for data in processed_data if data] + return processed_data finally: # Delete temporary directories for temp_dir in self._temp_dirs: temp_dir.cleanup() # Delete files marked for deletion - for path, delete_after_processing in final_files_with_flags: - if delete_after_processing and path.exists(): - if path.is_dir(): - shutil.rmtree(path) + for file in final_files: + if file.delete_after_processing and file.path.exists(): + if file.path.is_dir(): + shutil.rmtree(file.path) else: - path.unlink() + file.path.unlink() @property def valid_extensions(self) -> list[str]: @@ -163,31 +179,39 @@ def ignore_starts_with(self) -> list[str]: """ return self.IGNORE_STARTS_WITH - def _validate_and_resolve_paths(self) -> list[tuple[Path, bool]]: - """Validate that all input paths exist and are valid. + def _validate_and_resolve_paths(self) -> list[BaseFile]: + """Validate that all input paths exist and are valid, and create BaseFile instances. Returns: - list[tuple[Path, bool]]: A list of valid paths and whether they should be deleted after processing. + list[BaseFile]: A list of valid BaseFile instances. Raises: ValueError: If any path does not exist. """ - resolved_paths = [] + resolved_files = [] - def add_path(path: str, *, delete_after_processing: bool): + def add_file(data: Data, path: str | Path, *, delete_after_processing: bool): resolved_path = Path(self.resolve_path(path)) if not resolved_path.exists(): msg = f"File or directory not found: {path}" self.log(msg) if not self.silent_errors: raise ValueError(msg) - resolved_paths.append((resolved_path, delete_after_processing)) + resolved_files.append(BaseFileComponent.BaseFile(data, resolved_path, delete_after_processing=delete_after_processing)) if self.path and not self.file_path: # Only process self.path if file_path is not provided - add_path(self.path, delete_after_processing=False) # Files from self.path are never deleted + # Wrap self.path into a Data object + data_obj = Data(file_path=self.path) + add_file(data=data_obj, path=self.path, delete_after_processing=False) elif self.file_path: if isinstance(self.file_path, Data): self.file_path = [self.file_path] + elif not isinstance(self.file_path, list): + msg = f"Expected list of Data objects in file_path but got {type(self.file_path)}." + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) + return [] for obj in self.file_path: if not isinstance(obj, Data): @@ -199,32 +223,43 @@ def add_path(path: str, *, delete_after_processing: bool): server_file_path = obj.data.get(self.SERVER_FILE_PATH_FIELDNAME) if server_file_path: - add_path(server_file_path, delete_after_processing=self.delete_server_file_after_processing) + add_file( + data=obj, + path=server_file_path, + delete_after_processing=self.delete_server_file_after_processing, + ) else: msg = f"Data object missing '{self.SERVER_FILE_PATH_FIELDNAME}' property." self.log(msg) if not self.silent_errors: raise ValueError(msg) - return resolved_paths + return resolved_files - def _unpack_and_collect_files(self, paths_with_flags: list[tuple[Path, bool]]) -> list[tuple[Path, bool]]: - """Recursively unpack bundles and collect files. + def _unpack_and_collect_files(self, files: list[BaseFile]) -> list[BaseFile]: + """Recursively unpack bundles and collect files into BaseFile instances. Args: - paths_with_flags (list[tuple[Path, bool]]): List of input paths and their delete-after-processing flags. + files (list[BaseFile]): List of BaseFile instances to process. Returns: - list[tuple[Path, bool]]: - List of all files after unpacking bundles, along with their delete-after-processing flags. + list[BaseFile]: Updated list of BaseFile instances. """ - collected_files_with_flags = [] + collected_files = [] + + for file in files: + path = file.path + delete_after_processing = file.delete_after_processing + data = file.data - for path, delete_after_processing in paths_with_flags: if path.is_dir(): # Recurse into directories - collected_files_with_flags.extend( - [(sub_path, delete_after_processing) for sub_path in path.rglob("*") if sub_path.is_file()] + collected_files.extend( + [ + BaseFileComponent.BaseFile(data, sub_path, delete_after_processing=delete_after_processing) + for sub_path in path.rglob("*") + if sub_path.is_file() + ] ) elif path.suffix[1:] in self.SUPPORTED_BUNDLE_EXTENSIONS: # Unpack supported bundles @@ -235,22 +270,18 @@ def _unpack_and_collect_files(self, paths_with_flags: list[tuple[Path, bool]]) - subpaths = list(temp_dir_path.iterdir()) self.log(f"Unpacked bundle {path.name} into {subpaths}") for sub_path in subpaths: - if sub_path.is_dir(): - # Add directory to process its contents later - collected_files_with_flags.append((sub_path, delete_after_processing)) - else: - collected_files_with_flags.append((sub_path, delete_after_processing)) + collected_files.append(BaseFileComponent.BaseFile(data, sub_path, delete_after_processing=delete_after_processing)) else: - collected_files_with_flags.append((path, delete_after_processing)) + collected_files.append(file) # Recurse again if any directories or bundles are left in the list if any( - file.is_dir() or file.suffix[1:] in self.SUPPORTED_BUNDLE_EXTENSIONS - for file, _ in collected_files_with_flags + file.path.is_dir() or file.path.suffix[1:] in self.SUPPORTED_BUNDLE_EXTENSIONS + for file in collected_files ): - return self._unpack_and_collect_files(collected_files_with_flags) + return self._unpack_and_collect_files(collected_files) - return collected_files_with_flags + return collected_files def _safe_extract( self, @@ -309,38 +340,38 @@ def _unpack_bundle(self, bundle_path: Path, output_dir: Path): if not self.silent_errors: raise ValueError(msg) - def _filter_and_mark_files(self, files_with_flags: list[tuple[Path, bool]]) -> list[tuple[Path, bool]]: + def _filter_and_mark_files(self, files: list[BaseFile]) -> list[BaseFile]: """Validate file types and mark files for removal. Args: - files_with_flags (list[tuple[Path, bool]]): List of files and their delete-after-processing flags. + files (list[BaseFile]): List of BaseFile instances. Returns: - list[tuple[Path, bool]]: Validated files with their remove-after-processing markers. + list[BaseFile]: Validated BaseFile instances. Raises: ValueError: If unsupported files are encountered and `ignore_unsupported_extensions` is False. """ - final_files_with_flags = [] + final_files = [] ignored_files = [] - for file, delete_after_processing in files_with_flags: - if not file.is_file(): - self.log(f"Not a file: {file.name}") + for file in files: + if not file.path.is_file(): + self.log(f"Not a file: {file.path.name}") continue - if file.suffix[1:] not in self.valid_extensions: + if file.path.suffix[1:] not in self.valid_extensions: if self.ignore_unsupported_extensions: - ignored_files.append(file.name) + ignored_files.append(file.path.name) continue - msg = f"Unsupported file extension: {file.suffix}" + msg = f"Unsupported file extension: {file.path.suffix}" self.log(msg) if not self.silent_errors: raise ValueError(msg) - final_files_with_flags.append((file, delete_after_processing)) + final_files.append(file) if ignored_files: self.log(f"Ignored files: {ignored_files}") - return final_files_with_flags + return final_files diff --git a/src/backend/base/langflow/components/data/file.py b/src/backend/base/langflow/components/data/file.py index 08b1f4a1239..c5c4de276c2 100644 --- a/src/backend/base/langflow/components/data/file.py +++ b/src/backend/base/langflow/components/data/file.py @@ -1,5 +1,3 @@ -from pathlib import Path - from langflow.base.data import BaseFileComponent from langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data from langflow.io import BoolInput, IntInput @@ -42,35 +40,29 @@ class FileComponent(BaseFileComponent): *BaseFileComponent._base_outputs, ] - def process_files(self, file_list: list[Path]) -> list[Data]: - """Processes a list of individual files and returns parsed data. - - This method supports optional multithreading for improved performance - when processing multiple files. - - Args: - file_list (list[Path]): A list of file paths to be processed. + def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]: - Returns: - list[Data]: A list of parsed data objects from the processed files. - """ - - def process_file(file_path: Path, *, silent_errors: bool = False) -> Data: + def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None: + """Processes a single file and returns its Data object.""" try: - return parse_text_file_to_data(str(file_path), silent_errors=silent_errors) + return parse_text_file_to_data(file_path, silent_errors=silent_errors) except FileNotFoundError as e: - msg = f"File not found: {file_path.name}. Error: {e}" + msg = f"File not found: {file_path}. Error: {e}" self.log(msg) if not silent_errors: raise return None except Exception as e: - msg = f"Unexpected error processing {file_path.name}: {e}" + msg = f"Unexpected error processing {file_path}: {e}" self.log(msg) if not silent_errors: raise return None + if not file_list: + self.log("No files to process.") + return file_list + concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading) file_count = len(file_list) @@ -78,15 +70,21 @@ def process_file(file_path: Path, *, silent_errors: bool = False) -> Data: if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold: if file_count > 1: self.log(f"Processing {file_count} files sequentially.") - processed_data = [process_file(file) for file in file_list if file] + processed_files = [ + (file, process_file(str(file.path), silent_errors=self.silent_errors)) for file in file_list if file + ] else: self.log(f"Starting parallel processing of {file_count} files with concurrency: {concurrency}.") + file_paths = [str(file.path) for file in file_list if file] processed_data = parallel_load_data( - file_list, + file_paths, silent_errors=self.silent_errors, load_function=process_file, max_concurrency=concurrency, ) + processed_files = zip(file_list, processed_data) + + for file, parsed_data in processed_files: + file.merge_data(parsed_data) - # Filter out empty results and return - return [data for data in processed_data if data] + return file_list From b5566456e2b6980ef029a48e49e3acf96e785621 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Thu, 21 Nov 2024 14:31:06 +0000 Subject: [PATCH 15/20] [autofix.ci] apply automated fixes --- src/backend/base/langflow/base/data/base_file.py | 16 ++++++++++------ .../base/langflow/components/data/file.py | 3 +-- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/backend/base/langflow/base/data/base_file.py b/src/backend/base/langflow/base/data/base_file.py index 92ee9750c55..b88188a6daa 100644 --- a/src/backend/base/langflow/base/data/base_file.py +++ b/src/backend/base/langflow/base/data/base_file.py @@ -19,8 +19,9 @@ class BaseFileComponent(Component, ABC): and implement the `process_files` method. """ - class BaseFile(): + class BaseFile: """Internal class to represent a file with additional metadata.""" + def __init__(self, data: Data, path: Path, *, delete_after_processing: bool = False): self.data = data self.path = path @@ -37,7 +38,7 @@ def merge_data(self, new_data: Data | None) -> Data: """ if new_data is not None: self.data = Data(data={**self.data.data, **new_data.data}) - return self.data + return self.data def __str__(self): max_text_length = 50 @@ -197,7 +198,9 @@ def add_file(data: Data, path: str | Path, *, delete_after_processing: bool): self.log(msg) if not self.silent_errors: raise ValueError(msg) - resolved_files.append(BaseFileComponent.BaseFile(data, resolved_path, delete_after_processing=delete_after_processing)) + resolved_files.append( + BaseFileComponent.BaseFile(data, resolved_path, delete_after_processing=delete_after_processing) + ) if self.path and not self.file_path: # Only process self.path if file_path is not provided # Wrap self.path into a Data object @@ -270,14 +273,15 @@ def _unpack_and_collect_files(self, files: list[BaseFile]) -> list[BaseFile]: subpaths = list(temp_dir_path.iterdir()) self.log(f"Unpacked bundle {path.name} into {subpaths}") for sub_path in subpaths: - collected_files.append(BaseFileComponent.BaseFile(data, sub_path, delete_after_processing=delete_after_processing)) + collected_files.append( + BaseFileComponent.BaseFile(data, sub_path, delete_after_processing=delete_after_processing) + ) else: collected_files.append(file) # Recurse again if any directories or bundles are left in the list if any( - file.path.is_dir() or file.path.suffix[1:] in self.SUPPORTED_BUNDLE_EXTENSIONS - for file in collected_files + file.path.is_dir() or file.path.suffix[1:] in self.SUPPORTED_BUNDLE_EXTENSIONS for file in collected_files ): return self._unpack_and_collect_files(collected_files) diff --git a/src/backend/base/langflow/components/data/file.py b/src/backend/base/langflow/components/data/file.py index c5c4de276c2..0c27a866b36 100644 --- a/src/backend/base/langflow/components/data/file.py +++ b/src/backend/base/langflow/components/data/file.py @@ -41,7 +41,6 @@ class FileComponent(BaseFileComponent): ] def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]: - def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None: """Processes a single file and returns its Data object.""" try: @@ -82,7 +81,7 @@ def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None: load_function=process_file, max_concurrency=concurrency, ) - processed_files = zip(file_list, processed_data) + processed_files = zip(file_list, processed_data, strict=False) for file, parsed_data in processed_files: file.merge_data(parsed_data) From 3adbaa75bd63f6f8e1f8fd1c0ceaba8e94e4a656 Mon Sep 17 00:00:00 2001 From: Phil Miesle Date: Thu, 21 Nov 2024 16:59:43 +0000 Subject: [PATCH 16/20] ensuring processed data is linked to correct file data object --- .../base/langflow/base/data/base_file.py | 125 +++++++++++++++--- .../base/langflow/components/data/file.py | 21 ++- 2 files changed, 122 insertions(+), 24 deletions(-) diff --git a/src/backend/base/langflow/base/data/base_file.py b/src/backend/base/langflow/base/data/base_file.py index b88188a6daa..0fb8376e13f 100644 --- a/src/backend/base/langflow/base/data/base_file.py +++ b/src/backend/base/langflow/base/data/base_file.py @@ -22,33 +22,73 @@ class BaseFileComponent(Component, ABC): class BaseFile: """Internal class to represent a file with additional metadata.""" - def __init__(self, data: Data, path: Path, *, delete_after_processing: bool = False): + def __init__(self, data: Data | list[Data], path: Path, *, delete_after_processing: bool = False): self.data = data self.path = path self.delete_after_processing = delete_after_processing - def merge_data(self, new_data: Data | None) -> Data: - """Merges new data into the existing data object, handling None safely. + @property + def data(self) -> list[Data]: + return self._data or [] + + @data.setter + def data(self, value: Data | list[Data]): + if isinstance(value, Data): + self._data = [value] + elif isinstance(value, list) and all(isinstance(item, Data) for item in value): + self._data = value + else: + msg = f"data must be a Data object or a list of Data objects. Got: {type(value)}" + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) + + def merge_data(self, new_data: Data | list[Data] | None) -> list[Data]: + r"""Generate a new list of Data objects by merging `new_data` into the current `data`. Args: - new_data (Data | None): The new Data object to merge. If None, no changes are made. + new_data (Data | list[Data] | None): The new Data object(s) to merge into each existing Data object. + If None, the current `data` is returned unchanged. Returns: - Data: The merged data object. + list[Data]: A new list of Data objects with `new_data` merged. """ - if new_data is not None: - self.data = Data(data={**self.data.data, **new_data.data}) - return self.data + if new_data is None: + return self.data + + if isinstance(new_data, Data): + new_data_list = [new_data] + elif isinstance(new_data, list) and all(isinstance(item, Data) for item in new_data): + new_data_list = new_data + else: + msg = "new_data must be a Data object, a list of Data objects, or None." + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) + return self.data + + merged_data = [] + for data in self.data: + for new_data_item in new_data_list: + merged_data.append(Data(data={**data.data, **new_data_item.data})) + + return merged_data def __str__(self): - max_text_length = 50 - text_preview = self.data.get_text()[:max_text_length] - if len(self.data.get_text()) > max_text_length: - text_preview += "..." + if len(self.data) == 0: + text_preview = "" + elif len(self.data) == 1: + max_text_length = 50 + text_preview = self.data.get_text()[:max_text_length] + if len(self.data.get_text()) > max_text_length: + text_preview += "..." + text_preview = f"text_preview='{text_preview}'" + else: + text_preview = f"{len(self.data)} data objects" return ( - f"BaseFile(path={self.path}, " - f"delete_after_processing={self.delete_after_processing}, " - f"text_preview='{text_preview}')" + f"BaseFile(path={self.path}" + f", delete_after_processing={self.delete_after_processing}" + f", {text_preview}" ) # Subclasses can override these class variables @@ -142,8 +182,8 @@ def load_files(self) -> list[Data]: # Step 4: Process files processed_files = self.process_files(final_files) - # Extract Data objects to return - processed_data = [file.data for file in processed_files if file.data] + # Extract and flatten Data objects to return + processed_data = [data for file in processed_files for data in file.data if file.data] return processed_data finally: @@ -180,6 +220,57 @@ def ignore_starts_with(self) -> list[str]: """ return self.IGNORE_STARTS_WITH + def rollup_data( + self, + base_files: list[BaseFile], + data_list: list[Data | None], + path_field: str = SERVER_FILE_PATH_FIELDNAME, + ) -> list[BaseFile]: + """Rolls up Data objects into corresponding BaseFile objects, preserving the order of the + original BaseFile list. + + Args: + base_files (list[BaseFile]): The original BaseFile objects. + data_list (list[Data | None]): The list of data to be aggregated into the BaseFile objects. + path_field (str): The field name on the data_list objects that holds the file path as a string. + + Returns: + list[BaseFile]: A new list of BaseFile objects with merged `data` attributes. + """ + def _build_data_dict(data_list: list[Data | None], data_list_field: str) -> dict[str, list[Data]]: + """Builds a dictionary grouping Data objects by a specified field.""" + data_dict = {} + for data in data_list: + if data is None: + continue + key = data.data.get(data_list_field) + if key is None: + msg = f"Data object missing required field '{data_list_field}': {data}" + self.log(msg) + if not self.silent_errors: + raise ValueError(f"Data object missing required field '{data_list_field}': {data}") + continue + data_dict.setdefault(key, []).append(data) + return data_dict + + # Build the data dictionary from the provided data_list + data_dict = _build_data_dict(data_list, path_field) + + # Generate the updated list of BaseFile objects, preserving the order of base_files + updated_base_files = [] + for base_file in base_files: + new_data_list = data_dict.get(str(base_file.path), []) + merged_data_list = base_file.merge_data(new_data_list) + updated_base_files.append( + BaseFileComponent.BaseFile( + data=merged_data_list, + path=base_file.path, + delete_after_processing=base_file.delete_after_processing, + ) + ) + + return updated_base_files + def _validate_and_resolve_paths(self) -> list[BaseFile]: """Validate that all input paths exist and are valid, and create BaseFile instances. diff --git a/src/backend/base/langflow/components/data/file.py b/src/backend/base/langflow/components/data/file.py index 0c27a866b36..17d84a28d70 100644 --- a/src/backend/base/langflow/components/data/file.py +++ b/src/backend/base/langflow/components/data/file.py @@ -41,6 +41,14 @@ class FileComponent(BaseFileComponent): ] def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]: + """Processes files either sequentially or in parallel, depending on concurrency settings. + + Args: + file_list (list[BaseFileComponent.BaseFile]): List of files to process. + + Returns: + list[BaseFileComponent.BaseFile]: Updated list of files with merged data. + """ def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None: """Processes a single file and returns its Data object.""" try: @@ -69,21 +77,20 @@ def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None: if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold: if file_count > 1: self.log(f"Processing {file_count} files sequentially.") - processed_files = [ - (file, process_file(str(file.path), silent_errors=self.silent_errors)) for file in file_list if file + processed_data = [ + process_file(str(file.path), silent_errors=self.silent_errors) for file in file_list ] else: self.log(f"Starting parallel processing of {file_count} files with concurrency: {concurrency}.") - file_paths = [str(file.path) for file in file_list if file] + file_paths = [str(file.path) for file in file_list] processed_data = parallel_load_data( file_paths, silent_errors=self.silent_errors, load_function=process_file, max_concurrency=concurrency, ) - processed_files = zip(file_list, processed_data, strict=False) - for file, parsed_data in processed_files: - file.merge_data(parsed_data) + # Use rollup_basefile_data to merge processed data with BaseFile objects + updated_files = self.rollup_data(file_list, processed_data) - return file_list + return updated_files From 8e1bdacb46278a2862d1d073e1e605a737f6c259 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Thu, 21 Nov 2024 17:00:47 +0000 Subject: [PATCH 17/20] [autofix.ci] apply automated fixes --- src/backend/base/langflow/base/data/base_file.py | 3 ++- src/backend/base/langflow/components/data/file.py | 5 ++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/backend/base/langflow/base/data/base_file.py b/src/backend/base/langflow/base/data/base_file.py index 0fb8376e13f..ca4057dfc07 100644 --- a/src/backend/base/langflow/base/data/base_file.py +++ b/src/backend/base/langflow/base/data/base_file.py @@ -226,7 +226,7 @@ def rollup_data( data_list: list[Data | None], path_field: str = SERVER_FILE_PATH_FIELDNAME, ) -> list[BaseFile]: - """Rolls up Data objects into corresponding BaseFile objects, preserving the order of the + """Rolls up Data objects into corresponding BaseFile objects, preserving the order of the original BaseFile list. Args: @@ -237,6 +237,7 @@ def rollup_data( Returns: list[BaseFile]: A new list of BaseFile objects with merged `data` attributes. """ + def _build_data_dict(data_list: list[Data | None], data_list_field: str) -> dict[str, list[Data]]: """Builds a dictionary grouping Data objects by a specified field.""" data_dict = {} diff --git a/src/backend/base/langflow/components/data/file.py b/src/backend/base/langflow/components/data/file.py index 17d84a28d70..a80dce2397d 100644 --- a/src/backend/base/langflow/components/data/file.py +++ b/src/backend/base/langflow/components/data/file.py @@ -49,6 +49,7 @@ def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[Bas Returns: list[BaseFileComponent.BaseFile]: Updated list of files with merged data. """ + def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None: """Processes a single file and returns its Data object.""" try: @@ -77,9 +78,7 @@ def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None: if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold: if file_count > 1: self.log(f"Processing {file_count} files sequentially.") - processed_data = [ - process_file(str(file.path), silent_errors=self.silent_errors) for file in file_list - ] + processed_data = [process_file(str(file.path), silent_errors=self.silent_errors) for file in file_list] else: self.log(f"Starting parallel processing of {file_count} files with concurrency: {concurrency}.") file_paths = [str(file.path) for file in file_list] From 5fcff31564124a1a5ef2c786aaf987d674ab4899 Mon Sep 17 00:00:00 2001 From: Phil Miesle Date: Thu, 21 Nov 2024 17:18:34 +0000 Subject: [PATCH 18/20] addressing linting --- .../base/langflow/base/data/base_file.py | 29 ++++++++++--------- .../base/langflow/components/data/file.py | 4 +-- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/src/backend/base/langflow/base/data/base_file.py b/src/backend/base/langflow/base/data/base_file.py index ca4057dfc07..b899078337b 100644 --- a/src/backend/base/langflow/base/data/base_file.py +++ b/src/backend/base/langflow/base/data/base_file.py @@ -67,12 +67,11 @@ def merge_data(self, new_data: Data | list[Data] | None) -> list[Data]: raise ValueError(msg) return self.data - merged_data = [] - for data in self.data: - for new_data_item in new_data_list: - merged_data.append(Data(data={**data.data, **new_data_item.data})) - - return merged_data + return [ + Data(data={**data.data, **new_data_item.data}) + for data in self.data + for new_data_item in new_data_list + ] def __str__(self): if len(self.data) == 0: @@ -183,9 +182,8 @@ def load_files(self) -> list[Data]: processed_files = self.process_files(final_files) # Extract and flatten Data objects to return - processed_data = [data for file in processed_files for data in file.data if file.data] + return [data for file in processed_files for data in file.data if file.data] - return processed_data finally: # Delete temporary directories for temp_dir in self._temp_dirs: @@ -226,8 +224,7 @@ def rollup_data( data_list: list[Data | None], path_field: str = SERVER_FILE_PATH_FIELDNAME, ) -> list[BaseFile]: - """Rolls up Data objects into corresponding BaseFile objects, preserving the order of the - original BaseFile list. + r"""Rolls up Data objects into corresponding BaseFile objects in order given by `base_files`. Args: base_files (list[BaseFile]): The original BaseFile objects. @@ -249,7 +246,9 @@ def _build_data_dict(data_list: list[Data | None], data_list_field: str) -> dict msg = f"Data object missing required field '{data_list_field}': {data}" self.log(msg) if not self.silent_errors: - raise ValueError(f"Data object missing required field '{data_list_field}': {data}") + msg = f"Data object missing required field '{data_list_field}': {data}" + self.log(msg) + raise ValueError(msg) continue data_dict.setdefault(key, []).append(data) return data_dict @@ -364,10 +363,12 @@ def _unpack_and_collect_files(self, files: list[BaseFile]) -> list[BaseFile]: self._unpack_bundle(path, temp_dir_path) subpaths = list(temp_dir_path.iterdir()) self.log(f"Unpacked bundle {path.name} into {subpaths}") - for sub_path in subpaths: - collected_files.append( + collected_files.extend( + [ BaseFileComponent.BaseFile(data, sub_path, delete_after_processing=delete_after_processing) - ) + for sub_path in subpaths + ] + ) else: collected_files.append(file) diff --git a/src/backend/base/langflow/components/data/file.py b/src/backend/base/langflow/components/data/file.py index a80dce2397d..95a5b772235 100644 --- a/src/backend/base/langflow/components/data/file.py +++ b/src/backend/base/langflow/components/data/file.py @@ -90,6 +90,4 @@ def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None: ) # Use rollup_basefile_data to merge processed data with BaseFile objects - updated_files = self.rollup_data(file_list, processed_data) - - return updated_files + return self.rollup_data(file_list, processed_data) From ae010513681738658bfa4587cbfd9e9b3e5613c4 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Thu, 21 Nov 2024 17:19:20 +0000 Subject: [PATCH 19/20] [autofix.ci] apply automated fixes --- src/backend/base/langflow/base/data/base_file.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/backend/base/langflow/base/data/base_file.py b/src/backend/base/langflow/base/data/base_file.py index b899078337b..a5b2b89221d 100644 --- a/src/backend/base/langflow/base/data/base_file.py +++ b/src/backend/base/langflow/base/data/base_file.py @@ -68,9 +68,7 @@ def merge_data(self, new_data: Data | list[Data] | None) -> list[Data]: return self.data return [ - Data(data={**data.data, **new_data_item.data}) - for data in self.data - for new_data_item in new_data_list + Data(data={**data.data, **new_data_item.data}) for data in self.data for new_data_item in new_data_list ] def __str__(self): From 16ed91a8e9e7fd7d21949adb1fc9ebd9c306740f Mon Sep 17 00:00:00 2001 From: Phil Miesle Date: Fri, 22 Nov 2024 15:06:00 +0000 Subject: [PATCH 20/20] fixing edge case --- src/backend/base/langflow/base/data/base_file.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/backend/base/langflow/base/data/base_file.py b/src/backend/base/langflow/base/data/base_file.py index a5b2b89221d..e53cb153dba 100644 --- a/src/backend/base/langflow/base/data/base_file.py +++ b/src/backend/base/langflow/base/data/base_file.py @@ -166,6 +166,7 @@ def load_files(self) -> list[Data]: list[Data]: Parsed data from the processed files. """ self._temp_dirs = [] + final_files = [] # Initialize to avoid UnboundLocalError try: # Step 1: Validate the provided paths files = self._validate_and_resolve_paths()