-
Notifications
You must be signed in to change notification settings - Fork 0
/
scanfile.py
118 lines (94 loc) · 3.27 KB
/
scanfile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#!/usr/bin/python
from color import cDim
from pathlib import Path
import binascii
import hashlib
import logging
import shutil
class IScanFileReader:
"""
Interface for file operations used by ScanFile class."""
name: str = None
def open(self):
pass
def close(self):
pass
def read(self, size):
pass
def size(self):
pass
def as_posix(self):
pass
def rename(self, destFile: Path):
pass
def unlink(self):
pass
class PlainFileReader(IScanFileReader):
"""
Implementation of IScanFileReader for a file on a filesystem."""
def __init__(self, fileName: Path):
self.__fileName = fileName
self.__file = None
self.name = self.__fileName.name
def __repr__(self):
return str(self.__fileName.name)
def open(self):
if self.__file is not None:
logging.warning("file is already open %s", cDim(self.__fileName.as_posix()))
return
self.__file = open(self.__fileName, "rb")
def close(self):
if self.__file is None:
logging.warning("file is already closed %s", cDim(self.__fileName.as_posix()))
return
self.__file.close()
self.__file = None
def size(self):
return self.__fileName.stat().st_size
def read(self, size: int):
return self.__file.read(size)
def as_posix(self):
return self.__fileName.as_posix()
def rename(self, destFile: Path):
shutil.move(self.__fileName, destFile)
def unlink(self):
self.__fileName.unlink()
class ScanFile:
"""
ScanFile represents a found file on the filesystem.
On init the given file is read to calculate file size, CRC, SHA1 and MD5.
If the given file coudn't be read an exception is raised."""
def __init__(self, fileName: IScanFileReader):
self.fileName = fileName
self.isLoaded = False
logging.debug("load file %s into memory", cDim(self.fileName.as_posix()))
try:
fileName.open()
self.size = fileName.size()
fileLoaded = 0
rawMD5 = hashlib.md5()
rawSHA1 = hashlib.sha1()
rawCRC = 0
while True:
fileData = fileName.read(1024*1024)
if len(fileData) == 0:
break
fileLoaded += len(fileData)
rawMD5.update(fileData)
rawSHA1.update(fileData)
rawCRC = binascii.crc32(fileData, rawCRC)
logging.debug("file %s bytes loaded %s", cDim(self.fileName.as_posix()), self.size)
self.crc = format(rawCRC & 0xffffffff, "0>8x")
self.md5 = rawMD5.hexdigest()
self.sha1 = rawSHA1.hexdigest()
if self.size != fileLoaded:
logging.error("file %s file size %s does not match file system size %s",
cDim(self.fileName.as_posix()), cDim(self.size), cDim(fileLoaded))
else:
self.isLoaded = True
logging.info("scan file %s", cDim(str(vars(self))))
except OSError as error:
logging.error("open file %s caused an error %s",
cDim(self.fileName.as_posix()), cDim(error))
else:
fileName.close()