-
Notifications
You must be signed in to change notification settings - Fork 0
/
tosecMover.py
165 lines (151 loc) · 8.29 KB
/
tosecMover.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#!/usr/bin/python
from pathlib import Path
from color import cDim
from strategydiag import StrategyDiag
from strategyrename import StrategyRename, Matcher
from strategyscan import StrategyScan
from strategyscancompressed import StrategyScanCompressed
from tosecdat import InvalidTosecFileException, TosecGameEntry, TosecHeader
import argparse
import logging
import xml.etree.ElementTree
class Tosec:
"""
Tosec scanner. On init read all TOSEC DATs found in the given directory
or the single file.
The class will scan the given directores and either rename or diagnosis
the result depending on the given arguments."""
def __init__(self, tosecDir: str):
logging.debug("Init TOSEC DAT path %s", cDim(tosecDir))
tosecPath = Path(tosecDir).resolve()
if not tosecPath.exists():
logging.error("TOSEC DAT path %s does not exists", cDim(tosecPath))
return
if not tosecPath.is_dir():
romList = self.__readTosecFile(tosecPath)
else:
romList = {}
for tosecEntry in tosecPath.iterdir():
if not tosecEntry.is_dir():
newRomList = self.__readTosecFile(tosecEntry)
self.__joinRomLists(romList, newRomList)
self.__matcher = Matcher(romList)
def __readTosecFile(self, tosecFile: Path) -> dict:
"""
Reads a single TOSEC DAT file. Returns a dictonary of all ROM entries
of the DAT file. If a ROM is found several times in the DAT the complete
game entry is skipped."""
fileRomList = {}
try:
root = xml.etree.ElementTree.parse(tosecFile.as_posix()).getroot()
gameList = []
header = TosecHeader(root)
for game in root.findall("game"):
try:
entry = TosecGameEntry(game, header)
gameRomList = self.__createGameEntryRomList(entry, fileRomList)
gameList.append(entry)
self.__joinRomLists(fileRomList, gameRomList)
except InvalidTosecFileException as exception:
logging.warning("TOSEC DAT file %s parser error. Entry skipped because: %s",
cDim(tosecFile.as_posix()), exception)
logging.info("TOSEC DAT file %s loaded %s entries with %s roms",
cDim(tosecFile.as_posix()), len(gameList), len(fileRomList))
header.games = gameList
header.roms = fileRomList
except (InvalidTosecFileException, xml.etree.ElementTree.ParseError) as exception:
logging.warning("TOSEC DAT file %s parser error. File skipped because: %s",
cDim(tosecFile.as_posix()), exception)
return fileRomList
def __createGameEntryRomList(self, entry: TosecGameEntry, romList: dict) -> dict:
"""
Reads a single TOSEC DAT game entry ROM files.
It will check the given SHA1 dictonary if any ROM file is already
in the database. Will throw an exception if all ROMs of at least another game entries is identical
Otherwise a dict of sha1 ROMs is returned."""
dups = {}
gameRomList = {}
for rom in entry.roms:
if rom.sha1 in romList:
for gameEntry in romList[rom.sha1]:
game = gameEntry.game
dups[game] = dups.get(game, 0) + 1
gameRomList[rom.sha1] = [rom]
if len(dups) > 0:
for game, duplicateRoms in dict(dups).items():
if len(game.roms) != duplicateRoms or duplicateRoms != len(gameRomList):
dups.pop(game)
if len(dups) > 0:
sha1s = ', '.join(str(rom.sha1) for rom in entry.roms)
games = ', '.join(str(dup.name) for dup in dups.keys())
logging.debug("All Game ROMs %s of with sha1 %s were already added in other game %s",
cDim(entry.name), cDim(sha1s), cDim(games))
raise InvalidTosecFileException(f"All Game ROMs {cDim(entry.name)} were already added in other game {cDim(games)}")
return gameRomList
def __joinRomLists(self, romList: dict, concatList: dict):
"""
Joins two dictonaries together by adding entres for concatList into romList.
Will skip all files with matching sha1, but different values for md5, size or crc
If a ROM sha1 occurs in several roms entries the returned rom list will contain all."""
for entryKey, rom in concatList.items():
rom0 = rom[0]
if entryKey in romList:
existingEntry = romList[entryKey][0]
logging.info("TOSEC file %s with same sha1 %s and matching {md5=%s size=%s} already found in other TOSEC file %s",
cDim(existingEntry.game.header.name + "/" + existingEntry.name),
cDim(entryKey),
existingEntry.md5 == rom0.md5,
existingEntry.size == rom0.size,
cDim(rom0.game.header.name + "/" + rom0.name))
if existingEntry.md5 == rom0.md5 and existingEntry.size == rom0.size and existingEntry.crc == rom0.crc:
romList[entryKey].extend(rom)
else:
romList[entryKey] = rom
def scanDirectory(self, params: argparse.Namespace):
if params.source is not None:
scanPath = Path(params.source).resolve()
destPath = Path(params.dest).resolve()
if not destPath.exists():
logging.error("destination directory %s does not exists", cDim(params.dest))
return
if not destPath.is_dir():
logging.error("destination %s is not a directory", cDim(params.destDir))
return
strategy = StrategyRename(destPath, self.__matcher, params.delDupes, params.noWritePermission)
if params.diag:
strategy = strategy.doChain(StrategyDiag(params.noMissing, params.noHaving))
else:
scanPath = Path(params.dest).resolve()
strategy = StrategyDiag(params.noMissing, params.noHaving)
if params.scanCompressed:
strategy = strategy.doChain(StrategyScanCompressed(self.__matcher))
else:
strategy = strategy.doChain(StrategyScan(self.__matcher))
if not scanPath.exists():
logging.error("directory %s to scan does not exsits", cDim(scanPath.as_posix()))
return
try:
scanPaths = [scanPath]
while True:
startPaths = scanPaths
scanPaths = strategy.doStrategyScan(scanPaths)
if not params.recursive or len(scanPaths) <= 0 or startPaths == scanPaths:
break
finally:
strategy.doFinal()
parser = argparse.ArgumentParser()
parser.add_argument("--loglevel", choices=["error", "warning", "info", "debug"], default="warning", help="Loglevel for the programm. debug - very verbose. error - only important messages")
parser.add_argument("--delDupes", action="store_true", help="Delete duplicates found in source directory")
parser.add_argument("--diag", action="store_true", help="Also print diagnostic information when scanning source directory. This is always enabled if source is not given")
parser.add_argument("--noHaving", action="store_true", help="If in diagnostic mode don't print 'Having' files")
parser.add_argument("--noMissing", action="store_true", help="If in diagnostic mode don't print 'Missing' files")
parser.add_argument("--noWritePermission", action="store_true", help="remove write permission on a renamed file")
parser.add_argument("tosec", help="filename of TOSEC DAT file or directory to process")
parser.add_argument("--source", help="source file or directory to scan")
parser.add_argument("-r", action="store_true", dest="recursive", help="source directory is scaned recursively")
parser.add_argument("-x", action="store_true", dest="scanCompressed", help="compressed files in source directory is scaned. Supported file formats is ZIP. **Experimental** file is only extracted but not moved")
parser.add_argument("dest", help="destination directory to move found files. If no source is given the directory is scaned without moving")
args = parser.parse_args()
logging.basicConfig(level=args.loglevel.upper())
t = Tosec(args.tosec)
t.scanDirectory(args)