Skip to content

Commit

Permalink
re-adding ability for APIRequest to retry and save to a file
Browse files Browse the repository at this point in the history
  • Loading branch information
mieslep committed Nov 22, 2024
1 parent 13321a3 commit 05c2d7b
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 8 deletions.
155 changes: 147 additions & 8 deletions src/backend/base/langflow/components/data/api_request.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
import asyncio
import json
import mimetypes
import re
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Any
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
from zoneinfo import ZoneInfo

import httpx
import validators
from loguru import logger

from langflow.base.curl.parse import parse_context
from langflow.custom import Component
from langflow.io import DataInput, DropdownInput, IntInput, MessageTextInput, NestedDictInput, Output
from langflow.io import BoolInput, DataInput, DropdownInput, IntInput, MessageTextInput, NestedDictInput, Output
from langflow.schema import Data
from langflow.schema.dotdict import dotdict

Expand Down Expand Up @@ -73,6 +80,20 @@ class APIRequestComponent(Component):
value=5,
info="The timeout to use for the request.",
),
BoolInput(
name="follow_redirects",
display_name="Follow Redirects",
value=True,
info="Whether to follow http redirects.",
advanced=True,
),
BoolInput(
name="save_to_file",
display_name="Save to File",
value=False,
info="Save the API response to a temporary file",
advanced=True,
),
]

outputs = [
Expand Down Expand Up @@ -113,6 +134,9 @@ async def make_request(
headers: dict | None = None,
body: dict | None = None,
timeout: int = 5,
*,
follow_redirects: bool = True,
save_to_file: bool = False,
) -> Data:
method = method.upper()
if method not in {"GET", "POST", "PATCH", "PUT", "DELETE"}:
Expand All @@ -129,20 +153,58 @@ async def make_request(
raise ValueError(msg) from e

data = body or None
redirection_history = []

try:
response = await client.request(method, url, headers=headers, json=data, timeout=timeout)
try:
result = response.json()
except Exception: # noqa: BLE001
logger.opt(exception=True).debug("Error decoding JSON response")
result = response.text
response = await client.request(
method,
url,
headers=headers,
json=data,
timeout=timeout,
follow_redirects=follow_redirects,
)

redirection_history = [
{"url": str(redirect.url), "status_code": redirect.status_code} for redirect in response.history
]

if response.is_redirect:
redirection_history.append({"url": str(response.url), "status_code": response.status_code})

is_binary, file_path = self._response_info(response, with_file_path=save_to_file)

if save_to_file:
mode = "wb" if is_binary else "w"
encoding = response.encoding if mode == "w" else None
with file_path.open(mode, encoding=encoding) as f:
f.write(response.content if is_binary else response.text)

return Data(
data={
"source": url,
"headers": headers,
"status_code": response.status_code,
"file_path": str(file_path),
**({"redirection_history": redirection_history} if redirection_history else {}),
},
)
# Populate result when not saving to a file
if is_binary:
result = response.content
else:
try:
result = response.json()
except Exception: # noqa: BLE001
logger.opt(exception=True).debug("Error decoding JSON response")
result = response.text
return Data(
data={
"source": url,
"headers": headers,
"status_code": response.status_code,
"result": result,
**({"redirection_history": redirection_history} if redirection_history else {}),
},
)
except httpx.TimeoutException:
Expand All @@ -162,6 +224,7 @@ async def make_request(
"headers": headers,
"status_code": 500,
"error": str(exc),
**({"redirection_history": redirection_history} if redirection_history else {}),
},
)

Expand All @@ -179,6 +242,13 @@ async def make_requests(self) -> list[Data]:
headers = self.headers or {}
body = self.body or {}
timeout = self.timeout
follow_redirects = self.follow_redirects
save_to_file = self.save_to_file

invalid_urls = [url for url in urls if not validators.url(url)]
if invalid_urls:
msg = f"Invalid URLs provided: {invalid_urls}"
raise ValueError(msg)

if isinstance(self.query_params, str):
query_params = dict(parse_qsl(self.query_params))
Expand All @@ -201,9 +271,78 @@ async def make_requests(self) -> list[Data]:
async with httpx.AsyncClient() as client:
results = await asyncio.gather(
*[
self.make_request(client, method, u, headers, rec, timeout)
self.make_request(
client,
method,
u,
headers,
rec,
timeout,
follow_redirects=follow_redirects,
save_to_file=save_to_file,
)
for u, rec in zip(urls, bodies, strict=True)
]
)
self.status = results
return results

def _response_info(self, response: httpx.Response, *, with_file_path: bool = False) -> tuple[bool, Path | None]:
"""Determine the file path and whether the response content is binary.
Args:
response (Response): The HTTP response object.
with_file_path (bool): Whether to save the response content to a file.
Returns:
Tuple[bool, Path | None]:
A tuple containing a boolean indicating if the content is binary and the full file path (if applicable).
"""
# Determine if the content is binary
content_type = response.headers.get("Content-Type", "")
is_binary = "application/octet-stream" in content_type or "application/binary" in content_type

if not with_file_path:
return is_binary, None

# Step 1: Set up a subdirectory for the component in the OS temp directory
component_temp_dir = Path(tempfile.gettempdir()) / self.__class__.__name__
component_temp_dir.mkdir(parents=True, exist_ok=True)

# Step 2: Extract filename from Content-Disposition
filename = None
if "Content-Disposition" in response.headers:
content_disposition = response.headers["Content-Disposition"]
filename_match = re.search(r'filename="(.+?)"', content_disposition)
if not filename_match: # Try to match RFC 5987 style
filename_match = re.search(r"filename\*=(?:UTF-8'')?(.+)", content_disposition)
if filename_match:
extracted_filename = filename_match.group(1)
# Ensure the filename is unique
if (component_temp_dir / extracted_filename).exists():
timestamp = datetime.now(ZoneInfo("UTC")).strftime("%Y%m%d%H%M%S%f")
filename = f"{timestamp}-{extracted_filename}"
else:
filename = extracted_filename

# Step 3: Infer file extension or use part of the request URL if no filename
if not filename:
# Extract the last segment of the URL path
url_path = urlparse(response.request.url).path
base_name = Path(url_path).name # Get the last segment of the path
if not base_name: # If the path ends with a slash or is empty
base_name = "response"

# Infer file extension
extension = mimetypes.guess_extension(content_type.split(";")[0]) if content_type else None
if not extension:
extension = ".bin" if is_binary else ".txt" # Default extensions

# Combine the base name with timestamp and extension
timestamp = datetime.now(ZoneInfo("UTC")).strftime("%Y%m%d%H%M%S%f")
filename = f"{timestamp}-{base_name}{extension}"

# Step 4: Define the full file path
file_path = component_temp_dir / filename

return is_binary, file_path

Check failure on line 348 in src/backend/base/langflow/components/data/api_request.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.12)

Ruff (W292)

src/backend/base/langflow/components/data/api_request.py:348:36: W292 No newline at end of file
1 change: 1 addition & 0 deletions src/backend/base/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ dependencies = [
"fastapi-pagination>=0.12.29",
"defusedxml>=0.7.1",
"pypdf~=5.1.0",
"validators>=0.34.0",
]

[project.urls]
Expand Down

0 comments on commit 05c2d7b

Please sign in to comment.