Skip to content

Commit

Permalink
Merge pull request #4638 from freelawproject/regular_ingestion_pacerf…
Browse files Browse the repository at this point in the history
…ree_to_caselaw

feat(scrape_pacer_free_opinions): apply task recap_document_into_opinions
  • Loading branch information
flooie authored Nov 7, 2024
2 parents ce608bf + 3186804 commit e434588
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 62 deletions.
52 changes: 7 additions & 45 deletions cl/corpus_importer/management/commands/recap_into_opinions.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,12 @@
from asgiref.sync import async_to_sync
from django.conf import settings
from django.core.management import BaseCommand
from django.db.models import Q
from eyecite.tokenizers import HyperscanTokenizer
from httpx import (
HTTPStatusError,
NetworkError,
RemoteProtocolError,
Response,
TimeoutException,
)

from cl.corpus_importer.tasks import ingest_recap_document

from cl.corpus_importer.tasks import recap_document_into_opinions
from cl.lib.celery_utils import CeleryThrottle
from cl.lib.command_utils import logger
from cl.lib.decorators import retry
from cl.lib.microservice_utils import microservice
from cl.search.models import SOURCES, Court, OpinionCluster, RECAPDocument

HYPERSCAN_TOKENIZER = HyperscanTokenizer(cache_dir=".hyperscan")


@retry(
ExceptionToCheck=(
NetworkError,
TimeoutException,
RemoteProtocolError,
HTTPStatusError,
),
tries=3,
delay=5,
backoff=2,
logger=logger,
)
def extract_recap_document(rd: RECAPDocument) -> Response:
"""Call recap-extract from doctor with retries
:param rd: the recap document to extract
:return: Response object
"""
response = async_to_sync(microservice)(
service="recap-extract",
item=rd,
params={"strip_margin": True},
)
response.raise_for_status()
return response


def import_opinions_from_recap(
jurisdiction: str | None = None,
Expand Down Expand Up @@ -89,6 +49,8 @@ def import_opinions_from_recap(

# Manually select the replica db which has an addt'l index added to
# improve this query.
# Since we don't have scrapers for FD courts, the last documents
# that are not from SOURCES.RECAP should be from Harvard or other import
latest_date_filed = (
OpinionCluster.objects.using(db_connection)
.filter(docket__court=court)
Expand All @@ -97,7 +59,7 @@ def import_opinions_from_recap(
.values_list("date_filed", flat=True)
.first()
)
if latest_date_filed == None:
if latest_date_filed is None:
logger.error(
msg=f"Court {court.id} has no opinion clusters for recap import"
)
Expand All @@ -121,8 +83,8 @@ def import_opinions_from_recap(
f"{count}: Importing rd {recap_document.id} in {court.id}"
)
throttle.maybe_wait()
ingest_recap_document.apply_async(
args=[recap_document.id, add_to_solr], queue=queue
recap_document_into_opinions.apply_async(
args=[{}, recap_document.id, add_to_solr], queue=queue
)
count += 1
if total_count > 0 and count >= total_count:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@
import datetime
import inspect
import math
import os
import time
from typing import Callable, Dict, List, Optional, cast

from celery.canvas import chain
from django.conf import settings
from django.db.models import F, Q, Window
from django.db.models.functions import RowNumber
from django.utils.timezone import now
Expand All @@ -23,6 +21,7 @@
get_and_save_free_document_report,
mark_court_done_on_date,
process_free_opinion_result,
recap_document_into_opinions,
)
from cl.corpus_importer.utils import CycleChecker
from cl.lib.argparse_types import valid_date
Expand All @@ -35,9 +34,6 @@
from cl.search.models import Court, RECAPDocument
from cl.search.tasks import add_docket_to_solr_by_rds, add_items_to_solr

PACER_USERNAME = os.environ.get("PACER_USERNAME", settings.PACER_USERNAME)
PACER_PASSWORD = os.environ.get("PACER_PASSWORD", settings.PACER_PASSWORD)


def get_last_complete_date(
court_id: str,
Expand Down Expand Up @@ -339,7 +335,6 @@ def get_pdfs(
f"before starting the next cycle."
)
time.sleep(2)

logger.info(f"Processing row id: {row.id} from {row.court_id}")
c = chain(
process_free_opinion_result.si(
Expand All @@ -348,8 +343,14 @@ def get_pdfs(
cnt,
).set(queue=q),
get_and_process_free_pdf.s(row.pk, row.court_id).set(queue=q),
# `recap_document_into_opinions` uses a different doctor extraction
# endpoint, so it doesn't depend on the document's content
# being extracted on `get_and_process_free_pdf`, where it's
# only extracted if it doesn't require OCR
recap_document_into_opinions.s().set(queue=q),
delete_pacer_row.s(row.pk).set(queue=q),
)

if index:
c = c | add_items_to_solr.s("search.RECAPDocument").set(queue=q)
c.apply_async()
Expand Down
43 changes: 32 additions & 11 deletions cl/corpus_importer/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2720,7 +2720,7 @@ def query_and_save_list_of_creditors(
backoff=2,
logger=logger,
)
def extract_recap_document(rd: RECAPDocument) -> Response:
def extract_recap_document_for_opinions(rd: RECAPDocument) -> Response:
"""Call recap-extract from doctor with retries
:param rd: the recap document to extract
Expand All @@ -2736,17 +2736,26 @@ def extract_recap_document(rd: RECAPDocument) -> Response:


@app.task(bind=True, max_retries=5, ignore_result=True)
def ingest_recap_document(
def recap_document_into_opinions(
self,
recap_document_id: int,
add_to_solr: bool,
) -> None:
task_data: Optional[TaskData] = None,
recap_document_id: Optional[int] = None,
add_to_solr: bool = False,
) -> Optional[TaskData]:
"""Ingest recap document into Opinions
:param task_data: dictionary that will contain the recap_document_id,
if called inside a chain() on the scraper_pacer_free_opinions
command. This task should be chained after the PDF has
been downloaded from PACER
:param recap_document_id: The document id to inspect and import
:param add_to_solr: Whether to add to solr
:return:None
:return: The same `task_data` that came as input
"""
if not recap_document_id and task_data:
recap_document_id = task_data["rd_pk"]

logger.info(f"Importing recap document {recap_document_id}")
recap_document = (
RECAPDocument.objects.select_related("docket_entry__docket")
Expand All @@ -2763,17 +2772,27 @@ def ingest_recap_document(
.get(id=recap_document_id)
)
docket = recap_document.docket_entry.docket
if recap_document.docket_entry.docket.court.jurisdiction == "FD":

jurisdiction = recap_document.docket_entry.docket.court.jurisdiction
court_id = recap_document.docket_entry.docket.court.id
# `dcd` has a regular juriscraper scraper. Avoid duplicates
if court_id in ["dcd", "orld"] or jurisdiction not in [
Court.FEDERAL_DISTRICT,
Court.FEDERAL_BANKRUPTCY,
]:
return task_data

if jurisdiction == Court.FEDERAL_DISTRICT:
if "cv" not in docket.docket_number.lower():
logger.info("Skipping non-civil opinion in district court")
return
return task_data

ops = Opinion.objects.filter(sha1=recap_document.sha1)
if ops.count() > 0:
logger.info(f"Skipping previously imported opinion: {ops[0].id}")
return
return task_data

response = extract_recap_document(rd=recap_document)
response = extract_recap_document_for_opinions(rd=recap_document)
r = response.json()

try:
Expand All @@ -2792,7 +2811,7 @@ def ingest_recap_document(
case_law_citations = filter_out_non_case_law_citations(citations)
if len(case_law_citations) == 0:
logger.info(f"No citation found for rd: {recap_document.id}")
return
return task_data

with transaction.atomic():
cluster = OpinionCluster.objects.create(
Expand Down Expand Up @@ -2823,3 +2842,5 @@ def ingest_recap_document(
cluster.id
)
)
# Return input task data to preserve the chain in scrape_pacer_free_opinion
return task_data

0 comments on commit e434588

Please sign in to comment.