-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
162 lines (139 loc) · 5.25 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@File : utils.py
@Time : 2024/02/11 09:52:26
@Author : 不要葱姜蒜
@Version : 1.0
@Desc : None
"""
import os
from typing import Dict, List, Optional, Tuple, Union
import PyPDF2
import markdown
import html2text
import json
from tqdm import tqdm
import tiktoken
from bs4 import BeautifulSoup
import re
enc = tiktoken.get_encoding("cl100k_base")
class ReadFiles:
"""
class to read files
"""
def __init__(self, path: str) -> None:
self._path = path
self.file_list = self.get_files()
def get_files(self):
# args:dir_path,目标文件夹路径
file_list = []
for filepath, dirnames, filenames in os.walk(self._path):
# os.walk 函数将递归遍历指定文件夹
for filename in filenames:
# 通过后缀名判断文件类型是否满足要求
if filename.endswith(".md"):
# 如果满足要求,将其绝对路径加入到结果列表
file_list.append(os.path.join(filepath, filename))
elif filename.endswith(".txt"):
file_list.append(os.path.join(filepath, filename))
elif filename.endswith(".pdf"):
file_list.append(os.path.join(filepath, filename))
return file_list
def get_content(self, max_token_len: int = 600, cover_content: int = 150):
docs = []
# 读取文件内容
for file in self.file_list:
content = self.read_file_content(file)
chunk_content = self.get_chunk(
content, max_token_len=max_token_len, cover_content=cover_content
)
docs.extend(chunk_content)
return docs
@classmethod
def get_chunk(cls, text: str, max_token_len: int = 600, cover_content: int = 150):
chunk_text = []
curr_len = 0
curr_chunk = ""
token_len = max_token_len - cover_content
lines = text.splitlines() # 假设以换行符分割文本为行
for line in lines:
line = line.replace(" ", "")
line_len = len(enc.encode(line))
if line_len > max_token_len:
# 如果单行长度就超过限制,则将其分割成多个块
num_chunks = (line_len + token_len - 1) // token_len
for i in range(num_chunks):
start = i * token_len
end = start + token_len
# 避免跨单词分割
while not line[start:end].rstrip().isspace():
start += 1
end += 1
if start >= line_len:
break
curr_chunk = curr_chunk[-cover_content:] + line[start:end]
chunk_text.append(curr_chunk)
# 处理最后一个块
start = (num_chunks - 1) * token_len
curr_chunk = curr_chunk[-cover_content:] + line[start:end]
chunk_text.append(curr_chunk)
if curr_len + line_len <= token_len:
curr_chunk += line
curr_chunk += "\n"
curr_len += line_len
curr_len += 1
else:
chunk_text.append(curr_chunk)
curr_chunk = curr_chunk[-cover_content:] + line
curr_len = line_len + cover_content
if curr_chunk:
chunk_text.append(curr_chunk)
return chunk_text
@classmethod
def read_file_content(cls, file_path: str):
# 根据文件扩展名选择读取方法
if file_path.endswith(".pdf"):
return cls.read_pdf(file_path)
elif file_path.endswith(".md"):
return cls.read_markdown(file_path)
elif file_path.endswith(".txt"):
return cls.read_text(file_path)
else:
raise ValueError("Unsupported file type")
@classmethod
def read_pdf(cls, file_path: str):
# 读取PDF文件
with open(file_path, "rb") as file:
reader = PyPDF2.PdfReader(file)
text = ""
for page_num in range(len(reader.pages)):
text += reader.pages[page_num].extract_text()
return text
@classmethod
def read_markdown(cls, file_path: str):
# 读取Markdown文件
with open(file_path, "r", encoding="utf-8") as file:
md_text = file.read()
html_text = markdown.markdown(md_text)
# 使用BeautifulSoup从HTML中提取纯文本
soup = BeautifulSoup(html_text, "html.parser")
plain_text = soup.get_text()
# 使用正则表达式移除网址链接
text = re.sub(r"http\S+", "", plain_text)
return text
@classmethod
def read_text(cls, file_path: str):
# 读取文本文件
with open(file_path, "r", encoding="utf-8") as file:
return file.read()
class Documents:
"""
获取已分好类的json格式文档
"""
def __init__(self, path: str = "") -> None:
self.path = path
def get_content(self):
with open(self.path, mode="r", encoding="utf-8") as f:
content = json.load(f)
return content