Skip to content

Commit

Permalink
Merge pull request #4636 from freelawproject/update_import_harvard_pdfs
Browse files Browse the repository at this point in the history
feat(harvard_pdfs): parallelize cap pdf download
  • Loading branch information
quevon24 authored Oct 31, 2024
2 parents 65dabe9 + d02d1fd commit 87f292e
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 77 deletions.
218 changes: 146 additions & 72 deletions cl/search/management/commands/import_harvard_pdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
import logging
import os
import tempfile
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, Optional, Tuple

import boto3
from django.conf import settings
from django.core.files.base import ContentFile
from django.core.management.base import BaseCommand
from tqdm import tqdm

from cl.lib.storage import HarvardPDFStorage
from cl.search.models import OpinionCluster
Expand All @@ -32,6 +33,11 @@ def add_arguments(self, parser):
type=str,
help="Specific reporter to process (e.g., 'A.2d')",
)
parser.add_argument(
"--start-from-reporter",
type=str,
help="Process starting from this reporter (e.g., 'A.2d')",
)
parser.add_argument(
"--resume",
action="store_true",
Expand All @@ -51,6 +57,12 @@ def add_arguments(self, parser):
help="Directory for reading crosswalk files",
required=True,
)
parser.add_argument(
"--max-workers",
type=int,
default=2,
help="The maximum number of concurrent processes to use.",
)

def handle(self, *args: Any, **options: Any) -> None:
"""Handle the command execution.
Expand All @@ -64,16 +76,26 @@ def handle(self, *args: Any, **options: Any) -> None:
logger.setLevel(logging.DEBUG)

self.dry_run = options["dry_run"]
self.resume = options["resume"]
self.reporter = options["reporter"]
self.crosswalk_dir = options["crosswalk_dir"]
self.max_workers = options["max_workers"]
self.start_from_reporter = options["start_from_reporter"]

if not os.path.exists(self.crosswalk_dir):
logger.warning(
f"Crosswalk directory does not exist: {self.crosswalk_dir}"
)
return

if self.resume and self.start_from_reporter:
logger.warning(
f"You can't combine --resume and --start-from-reporter arguments."
)
return

self.setup_s3_clients()
self.process_crosswalks(options["reporter"], options["resume"])
self.process_crosswalks(self.reporter, self.resume)

def setup_s3_clients(self) -> None:
"""Initialize S3 client for accessing Harvard CAP R2.
Expand All @@ -99,8 +121,18 @@ def process_crosswalks(
:return: None
"""
logger.info(
f"Processing crosswalks. Reporter: {specific_reporter}, Resume: {resume}"
f"Processing crosswalks. Specific reporter: {specific_reporter}, Resume: {self.resume}, Workers: {self.max_workers}"
)
# Find all json files
reporters_files = sorted(os.listdir(self.crosswalk_dir))
reporters_files = [r for r in reporters_files if ".json" in r]

# Generate a list of reporters available
reporters = [
r.replace(".json", "").replace("_", ".").replace("..", ". ")
for r in reporters_files
]

last_reporter_file = os.path.join(
self.crosswalk_dir, "last_completed_reporter.txt"
)
Expand All @@ -120,12 +152,36 @@ def process_crosswalks(
else:
last_completed_reporter = None

for filename in sorted(os.listdir(self.crosswalk_dir)):
if self.start_from_reporter:
if self.start_from_reporter not in reporters:
logger.error(
f"Invalid reporter to start from: {self.start_from_reporter}. Valid options: {reporters}"
)
return
else:
reporter_item_index = reporters.index(self.start_from_reporter)
if reporter_item_index:
# Update reporters and reporters files list
reporters = reporters[reporter_item_index:]
reporters_files = reporters_files[reporter_item_index:]
self.start_from_reporter = None

for filename in reporters_files:
if filename.endswith(".json"):
reporter = filename.replace("_", ".").rstrip(".json")
reporter = (
filename.replace(".json", "")
.replace("_", ".")
.replace("..", ". ")
)

# Skip reporters until we reach the resume point
if resume and last_completed_reporter:
if last_completed_reporter not in reporters:
logger.error(
f"Invalid last completed reporter: {last_completed_reporter}. Valid options: {reporters}"
)
return

if reporter <= last_completed_reporter:
logger.info(
f"Skipping already processed reporter: {reporter}"
Expand All @@ -145,53 +201,102 @@ def process_crosswalks(
with open(last_reporter_file, "w") as f:
f.write(reporter)

def process_crosswalk_file(self, crosswalk_file: str) -> None:
"""Process a single crosswalk file.
:param crosswalk_file: Path to the crosswalk file.
:return: None
def process_entry(self, entry: Dict[str, Any]) -> int:
"""Processes a single entry by attempting to download and store its associated
PDF file
:param entry: A dictionary containing details about the case entry.
:return: An integer indicating the result of the process:
- 1 if the PDF was successfully downloaded and stored.
- 0 if the PDF was not downloaded (e.g., already processed, dry run
mode, or an error occurred).
"""
logger.info(f"Processing crosswalk file: {crosswalk_file}")
with open(crosswalk_file, "r") as f:
crosswalk_data = json.load(f)

for entry in tqdm(crosswalk_data, desc="Processing entries"):
logger.debug(f"Processing entry: {entry}")
try:
cap_case_id = entry["cap_case_id"]
cl_cluster_id = entry["cl_cluster_id"]
json_path = entry["cap_path"]
logger.debug(f"Processing entry: {entry}")
try:
cap_case_id = entry["cap_case_id"]
cl_cluster_id = entry["cl_cluster_id"]
json_path = entry["cap_path"]

# Construct the PDF path based on the JSON path
pdf_path = json_path.replace("cases", "case-pdfs").replace(
".json", ".pdf"
)
# Construct the PDF path based on the JSON path
pdf_path = json_path.replace("cases", "case-pdfs").replace(
".json", ".pdf"
)

if pdf_path in self.processed_pdfs:
logger.info(f"Skipping already processed PDF: {pdf_path}")
continue
if pdf_path in self.processed_pdfs:
logger.info(f"Skipping already processed PDF: {pdf_path}")
# Early abort
return 0

logger.info(f"Processing PDF: {pdf_path}")
logger.info(f"Processing PDF: {pdf_path}")

if not self.dry_run:
if not self.dry_run:
cluster = OpinionCluster.objects.get(id=cl_cluster_id)
if not cluster.filepath_pdf_harvard:
# We don't have the pdf file yet
pdf_content = self.fetch_pdf_from_cap(pdf_path)
if pdf_content:
cluster = OpinionCluster.objects.get(id=cl_cluster_id)
self.store_pdf_in_cl(cluster, pdf_content)
self.processed_pdfs.add(pdf_path)
# Successfully downloaded and stored
return 1
else:
logger.info(f"Dry run: Would fetch PDF from {pdf_path}")
logger.info(
f"Cluster: {cl_cluster_id} already has a PDF file assigned: {pdf_path}"
)
else:
logger.info(f"Dry run: Would fetch PDF from {pdf_path}")

except OpinionCluster.DoesNotExist:
logger.error(
f"Cluster id: {entry.get('cl_cluster_id', 'Unknown')} doesn't exist."
)
except KeyError as e:
logger.error(
f"Missing key in entry: {e}. Entry: {json.dumps(entry, indent=2)}"
)
except Exception as e:
logger.error(
f"Error processing CAP ID {entry.get('cap_case_id', 'Unknown')}: {str(e)}",
exc_info=True,
)

self.processed_pdfs.add(pdf_path)
# No files downloaded
return 0

except KeyError as e:
logger.error(
f"Missing key in entry: {e}. Entry: {json.dumps(entry, indent=2)}"
)
except Exception as e:
logger.error(
f"Error processing CAP ID {entry.get('cap_case_id', 'Unknown')}: {str(e)}",
exc_info=True,
)
def process_crosswalk_file(self, crosswalk_file: str) -> None:
"""Process a single crosswalk file.
:param crosswalk_file: Path to the crosswalk file.
:return: None
"""
logger.info(f"Processing crosswalk file: {crosswalk_file}")

start_time = time.time()

with open(crosswalk_file, "r") as f:
crosswalk_data = json.load(f)
logger.info(f"Documents to download: {len(crosswalk_data)}")

total_downloaded = 0

with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_entry = {
executor.submit(self.process_entry, entry): entry
for entry in crosswalk_data
}

for future in as_completed(future_to_entry):
entry = future_to_entry[future]
try:
result = future.result()
if result is not None:
total_downloaded += result
except Exception as e:
logger.error(f"Error processing entry {entry}: {str(e)}")

total_time = time.time() - start_time
logger.info(
f"Finished processing all entries in the crosswalk file: {crosswalk_file} - Total time: {total_time:.2f} seconds. Total files downloaded: {total_downloaded}."
)

def parse_cap_path(self, cap_path: str) -> Tuple[str, str, str]:
"""Extract data from CAP path.
Expand All @@ -206,37 +311,6 @@ def parse_cap_path(self, cap_path: str) -> Tuple[str, str, str]:
case_name = parts[-1].replace(".json", "")
return reporter_slug, volume_folder, case_name

def process_entry(self, entry: Dict[str, Any]) -> None:
"""Process a single crosswalk entry.
:param entry: Dictionary containing crosswalk entry data.
:return: None
"""
cap_case_id = entry["cap_case_id"]
cl_cluster_id = entry["cl_cluster_id"]
cap_path = entry["cap_path"]
logger.info(
f"Processing entry: cap_case_id={cap_case_id}, cl_cluster_id={cl_cluster_id}, cap_path={cap_path}"
)
try:
cluster = OpinionCluster.objects.get(id=cl_cluster_id)
logger.info(f"Found cluster: {cluster}")
pdf_content = self.fetch_pdf_from_cap(cap_path)
logger.info(
f"Fetched PDF content, length: {len(pdf_content) if pdf_content else 0}"
)
if pdf_content:
logger.info(
"PDF content is not empty, calling store_pdf_in_cl"
)
self.store_pdf_in_cl(cluster, pdf_content)
else:
logger.info("PDF content is empty, skipping storage")
except OpinionCluster.DoesNotExist:
logger.info(f"Cluster not found for id: {cl_cluster_id}")
except Exception as e:
logger.error(f"Error processing entry: {str(e)}", exc_info=True)

def fetch_pdf_from_cap(self, pdf_path: str) -> Optional[bytes]:
"""Fetch PDF content from CAP.
Expand Down
Loading

0 comments on commit 87f292e

Please sign in to comment.