-
Notifications
You must be signed in to change notification settings - Fork 0
/
filehasher.py
executable file
·175 lines (163 loc) · 6.15 KB
/
filehasher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#!/usr/bin/python3
import anime
import filehash
import os
import re
import configparser
import argparse
import logging
from typing import List, Tuple
from itertools import islice
def get_config(config_file="weltschmerz.cfg"):
config = configparser.ConfigParser()
config.read_file(open(config_file))
parser = argparse.ArgumentParser(description="Hash files and add info to database.")
parser.add_argument(
"--database", help="database to use", default=config.get("client", "database")
)
parser.add_argument(
"--log-file",
dest="log_file",
help="logfile to use",
default=config.get("client", "log"),
)
parser.add_argument(
"--folders",
nargs="*",
help="folders to process",
default=re.split("\s+", config.get("client", "folders", fallback="")),
)
parser.add_argument(
"--folders-exclude",
dest="folders_exclude",
nargs="*",
help="folders to exclude",
default=re.split("\s+", config.get("client", "folders_exclude")),
)
parser.add_argument(
"--foldernames-exclude",
dest="foldernames_exclude",
nargs="*",
help="foldernames to exclude",
default=re.split("\s+", config.get("client", "foldernames_exclude")),
)
parser.add_argument(
"--extensions",
nargs="*",
help="folders to process",
default=re.split("\s+", config.get("client", "fileextensions")),
)
args = parser.parse_args()
logging.basicConfig(
filename=args.log_file,
format="%(asctime)s - %(levelname)s - %(message)s",
level=logging.DEBUG,
)
return args
class FileHasher:
def __init__(self, db="sqlite:///:memory:", echo=False):
self.dbs = anime.DatabaseSession(db, echo)
self.files = []
def _batched(self, iterable, n):
it = iter(iterable)
while True:
batch = list(islice(it, n))
if not batch:
return
yield batch
def get_files(self, folders, folders_exclude, foldernames_exclude, exts):
files = {}
for path in folders:
logging.info("".join(("Scanning for files: ", path)))
for dirpath, dirnames, filenames in os.walk(path, topdown=True):
dirnames[:] = [
d
for d in dirnames
if not (
os.path.join(dirpath, d).startswith(tuple(folders_exclude))
or d in foldernames_exclude
or not d.encode("utf-8", errors="replace")
)
]
filenames[:] = [
f
for f in filenames
if f.casefold().endswith(tuple(exts))
and f.encode("utf-8", errors="replace")
]
print(
f'{dirpath.encode("utf-8", errors="replace").decode("utf-8")}: {len(set(filenames))}'
)
for filename in filenames:
real_path = os.path.realpath(os.path.abspath(dirpath))
full_path = os.path.join(real_path, filename)
try:
full_path.encode("utf-8")
except UnicodeEncodeError as e:
logging.warning(f"Skipping a file: {e}")
continue
if (
real_path in files.keys()
and filename in files[real_path]["files_to_hash"]
):
continue
elif os.path.islink(full_path):
continue
else:
if real_path not in files.keys():
files[real_path] = {
"known_files": [],
"files_to_hash": [filename],
}
else:
files[real_path]["files_to_hash"].append(filename)
# print(f"found {os.path.join(real_path, filename)}")
# files.append((real_path, filename))
# check hashed files for the folders in batches to reduce complexity of the SQL query
for batch in self._batched(files.keys(), 1000):
for kf in (
self.dbs.session.query(anime.LocalFile)
.filter(anime.LocalFile.directory.in_(batch))
.all()
):
files[kf.directory]["known_files"].append(kf.filename)
return files
if __name__ == "__main__":
config = get_config()
hasher = FileHasher(config.database, False)
files: dict[str, List[str]] = hasher.get_files(
config.folders,
config.folders_exclude,
config.foldernames_exclude,
config.extensions,
)
for directory, data in files.items():
to_hash = list(set(data["files_to_hash"]).difference(data["known_files"]))
print(f'{directory}: {len(data["files_to_hash"])} file(s) found / {len(data["known_files"])} known - {len(to_hash)} to hash')
for filename in to_hash:
if filename in data["known_files"]:
continue
fhash = filehash.FileHash(directory, filename)
lf = anime.LocalFile(
filename=fhash.filename,
directory=fhash.directory,
filesize=fhash.filesize,
hash_crc=fhash.crc32,
hash_md5=fhash.md5,
hash_sha1=fhash.sha1,
hash_ed2k=fhash.ed2k,
)
known_file = (
hasher.dbs.session.query(anime.File)
.join(
anime.LocalFile, anime.LocalFile.hash_ed2k == anime.File.hash_ed2k
)
.filter(anime.File.hash_ed2k == lf.hash_ed2k)
.all()
)
if len(known_file) == 1:
lf.fid = known_file[0].fid
lf.aid = known_file[0].aid
print(f"{lf.filename}: {lf.hash_crc}")
hasher.dbs.session.merge(lf)
hasher.dbs.session.commit()