-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
172 lines (133 loc) · 4.94 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import hashlib
import shutil
import time
import typing
from io import BufferedReader
from multiprocessing import Pool
from pathlib import Path
from tempfile import TemporaryDirectory
from urllib import request
import streamlit as st
import pydash as py_
from bs4 import BeautifulSoup as bs4
from grobid_client.grobid_client import GrobidClient
from enums import TaskType
from CONSTANTS import LLM_TEMPERATURE
from ent_extraction import extract_entities, prepare_embeddings, queries
from models import Downloaded_PDF, Load_XML, Paper, Task, Upload_PDF
from icecream import ic
from texts import chunker, xml_to_body_text
from pathlib import Path
T = typing.TypeVar("T")
@st.cache_data(persist="disk")
def query_embedder(task_type: TaskType):
return prepare_embeddings(queries[task_type])
def chcksum(buffer: str | bytes | BufferedReader):
if isinstance(buffer, str):
buffer = buffer.encode("utf-8")
if isinstance(buffer, BufferedReader):
buffer = buffer.read()
return hashlib.sha256(buffer).hexdigest()
def check_for_xmls(paper_id: str):
cached_xmls = {
path.name.replace(".grobid.tei.xml", ""): path
for path in Path("temp/xmls").glob("**/*.grobid.tei.xml")
}
if paper_id in cached_xmls:
return cached_xmls[paper_id]
return None
def load_xml(xml: Load_XML):
# None path xml already dropped; for precaution
if xml.path == None or not xml.path.exists():
return
with open(xml.path, "r") as file:
title = bs4(file, features="lxml").title
if title == None:
title = xml.id
else:
title = py_.human_case(title.get_text())
return Paper(
id=xml.id,
title=title,
xml_path=xml.path,
pdf_path={path.stem: path for path in Path("temp/pdfs").glob("**/*.pdf")}[
xml.id
],
)
gen_datetime_name = (
lambda: f"{time.strftime('%Y%m%d%H%M%S')}{int((time.time() - int(time.time())) * 1000):03d}"
)
def create_random_dir(parent="."):
new_cache_dir = f"{parent}/{gen_datetime_name()}"
Path(new_cache_dir).mkdir(parents=True, exist_ok=True)
return Path(new_cache_dir)
def upload_pdfs(
pdfs: py_.chaining.chaining.Chain[list[Upload_PDF]],
new_cache_dir: Path | None = None,
):
if new_cache_dir == None:
new_cache_dir = create_random_dir("temp/pdfs/")
try:
for pdf in pdfs.value():
with open(new_cache_dir.joinpath(pdf.id).with_suffix(".pdf"), "wb") as f:
f.write(pdf.file.getbuffer())
return new_cache_dir
except Exception as e:
shutil.rmtree(new_cache_dir)
raise Exception(
f"{e}\nOccured during uploading PDF {pdf.file.name}. All the PDFs queued for upload will be removed."
)
def download_pdfs(urls: list[str], new_cache_dir: Path | None = None):
with TemporaryDirectory(dir=Path("temp")) as tmpdir:
try:
for url in urls:
request.urlretrieve(url, f"{tmpdir}/{chcksum(url)}.pdf")
except:
raise Exception(f"Something went wrong, downloading from {url}")
if new_cache_dir == None:
new_cache_dir = create_random_dir("temp/pdfs/")
for pdf in Path(tmpdir).glob("*.pdf"):
with open(pdf, "rb") as file:
f_id = chcksum(file)
# duplicates are overwritten
shutil.move(
pdf.rename(pdf.with_name(f_id).with_suffix(".pdf")), new_cache_dir
)
return py_.chain(new_cache_dir.glob("*.pdf")).map_(
lambda pdf: Downloaded_PDF(pdf.name.replace(".pdf", ""), pdf)
)
def pdfs_to_xmls(pdf_load_dir: Path):
new_cache_dir = create_random_dir("temp/xmls")
try:
client = GrobidClient(config_path="./config.json")
client.process(
"processFulltextDocument",
pdf_load_dir,
output=new_cache_dir,
consolidate_header=False,
)
return py_.chain(Path(new_cache_dir).glob("**/*.grobid.tei.xml")).map_(
lambda xml: Load_XML(xml.name.replace(".grobid.tei.xml", ""), xml)
)
except Exception as e:
shutil.rmtree(new_cache_dir)
shutil.rmtree(pdf_load_dir)
raise Exception(f"{e}\nOccured during conversion of pdfs to xmls.")
upload_convert = py_.flow(upload_pdfs, pdfs_to_xmls)
def task_wrapper_extract_entities(task: Task):
start = time.perf_counter()
task.extracted_ents = extract_entities(
verify=task.verify,
chunks=chunker(xml_to_body_text(task.paper.xml_path)),
temperature=LLM_TEMPERATURE,
entity_type=task.type, # type: ignore
q_embeds=query_embedder(task.type), # type: ignore
)
task.time_elapsed = time.perf_counter() - start
task.pending = False
return task
def forker(tasks: list[T], function: typing.Callable[[T], T]):
start = time.perf_counter()
with Pool() as pool:
results = pool.map(function, tasks)
return (results, time.perf_counter() - start)