From 40a1c12f415a951a325ff79a7be0bd57244be202 Mon Sep 17 00:00:00 2001 From: c0m4r Date: Fri, 12 Jan 2024 09:31:13 +0100 Subject: [PATCH] refactoring, snake_case naming, removed unused function processExists() --- loki.py | 169 ++++++++++++++++++++++++++++---------------------------- 1 file changed, 83 insertions(+), 86 deletions(-) diff --git a/loki.py b/loki.py index 26defc5..d543f05 100755 --- a/loki.py +++ b/loki.py @@ -299,27 +299,27 @@ def scan_path(self, path): path, onerror=walk_error, followlinks=False ): # Skip paths that start with .. - newDirectories = [] + new_directories = [] for dir in directories: - skipIt = False + skip_it = False # Generate a complete path for comparisons - completePath = os.path.join(root, dir).lower() + os.sep + complete_path = os.path.join(root, dir).lower() + os.sep # Platform specific excludes for skip in self.start_excludes: - if completePath.startswith(skip): + if complete_path.startswith(skip): logger.log( "INFO", "FileScan", f"Skipping {skip} directory [fixed excludes] " "(try using --force or --alldrives)", ) - skipIt = True + skip_it = True - if not skipIt: - newDirectories.append(dir) - directories[:] = newDirectories + if not skip_it: + new_directories.append(dir) + directories[:] = new_directories loki.scan_path_files(root, directories, files, bar) @@ -354,13 +354,13 @@ def scan_path_files(self, root, directories, files, bar): extension = os.path.splitext(file_path)[1].lower() # Skip marker - skipIt = False + skip_it = False # User defined excludes for skip in self.full_excludes: if skip.search(file_path): logger.log("DEBUG", "FileScan", f"Skipping element {file_path}") - skipIt = True + skip_it = True # Linux directory skip if OS_PLATFORM == "linux": @@ -372,7 +372,7 @@ def scan_path_files(self, root, directories, files, bar): ): logger.log("INFO", "FileScan", f"Skipping {skip} element") self.LINUX_PATH_SKIPS_END[skip] = 1 - skipIt = True + skip_it = True # File mode try: @@ -394,7 +394,7 @@ def scan_path_files(self, root, directories, files, bar): continue # Skip - if skipIt: + if skip_it: continue # Counter @@ -412,7 +412,7 @@ def scan_path_files(self, root, directories, files, bar): ) continue - fileSize = os.stat(file_path).st_size + file_size = os.stat(file_path).st_size # print file_size # File Name Checks ------------------------------------------------- @@ -436,11 +436,11 @@ def scan_path_files(self, root, directories, files, bar): total_score += int(fioc["score"]) # Access check (also used for magic header detection) - firstBytesString = b"-" - hashString = "" + first_bytes_string = b"-" + hash_string = "" # Evaluate Type - fileType = loki_get_file_type( + file_type = loki_get_file_type( file_path, self.filetype_magics, self.max_filetype_magics, logger ) @@ -448,7 +448,7 @@ def scan_path_files(self, root, directories, files, bar): do_intense_check = True if ( not self.intense_mode - and fileType == "UNKNOWN" + and file_type == "UNKNOWN" and extension not in EVIL_EXTENSIONS ): if args.printall: @@ -459,21 +459,21 @@ def scan_path_files(self, root, directories, files, bar): ) do_intense_check = False - # Set fileData to an empty value - fileData = "" + # Set file_data to an empty value + file_data = "" print_filesize_info = False # Evaluations ------------------------------------------------------- # Evaluate size - fileSizeLimit = int(args.s) * 1024 - if fileSize > fileSizeLimit: + file_size_limit = int(args.s) * 1024 + if file_size > file_size_limit: # Print files do_intense_check = False print_filesize_info = True # Some file types will force intense check - if fileType == "MDMP": + if file_type == "MDMP": do_intense_check = True print_filesize_info = False @@ -482,13 +482,13 @@ def scan_path_files(self, root, directories, files, bar): logger.log( "INFO", "FileScan", - f"Scanning {file_name_cleaned} TYPE: {fileType} SIZE: {fileSize}", + f"Scanning {file_name_cleaned} TYPE: {file_type} SIZE: {file_size}", ) elif args.printall: logger.log( "INFO", "FileScan", - f"Checking {file_name_cleaned} TYPE: {fileType} SIZE: {fileSize}", + f"Checking {file_name_cleaned} TYPE: {file_type} SIZE: {file_size}", ) if print_filesize_info and args.printall: @@ -497,29 +497,29 @@ def scan_path_files(self, root, directories, files, bar): "FileScan", "Skipping file due to file size: %s TYPE: %s SIZE: %s " "CURRENT SIZE LIMIT(kilobytes): %d" - % (file_name_cleaned, fileType, fileSize, fileSizeLimit), + % (file_name_cleaned, file_type, file_size, file_size_limit), ) # Hash Check ------------------------------------------------------- # Do the check if do_intense_check: - fileData = self.get_file_data(file_path) + file_data = self.get_file_data(file_path) # First bytes - firstBytesString = "%s / %s" % ( - fileData[:20].hex(), - loki_remove_non_ascii_drop(fileData[:20]), + first_bytes_string = "%s / %s" % ( + file_data[:20].hex(), + loki_remove_non_ascii_drop(file_data[:20]), ) # Hash Eval - matchType = None - matchDesc = None - matchHash = None + match_type = None + match_desc = None + match_hash = None md5 = 0 sha1 = 0 sha256 = 0 - md5, sha1, sha256 = loki_generate_hashes(fileData) + md5, sha1, sha256 = loki_generate_hashes(file_data) md5_num = int(md5, 16) sha1_num = int(sha1, 16) sha256_num = int(sha256, 16) @@ -546,37 +546,43 @@ def scan_path_files(self, root, directories, files, bar): continue # Malware Hash - matchScore = 100 - matchLevel = "Malware" + match_score = 100 + match_level = "Malware" if ioc_contains(self.hashes_md5_list, md5_num): - matchType = "MD5" - matchDesc = self.hashes_md5[md5_num] - matchHash = md5 - matchScore = self.hashes_scores[md5_num] + match_type = "MD5" + match_desc = self.hashes_md5[md5_num] + match_hash = md5 + match_score = self.hashes_scores[md5_num] if ioc_contains(self.hashes_sha1_list, sha1_num): - matchType = "SHA1" - matchDesc = self.hashes_sha1[sha1_num] - matchHash = sha1 - matchScore = self.hashes_scores[sha1_num] + match_type = "SHA1" + match_desc = self.hashes_sha1[sha1_num] + match_hash = sha1 + match_score = self.hashes_scores[sha1_num] if ioc_contains(self.hashes_sha256_list, sha256_num): - matchType = "SHA256" - matchDesc = self.hashes_sha256[sha256_num] - matchHash = sha256 - matchScore = self.hashes_scores[sha256_num] + match_type = "SHA256" + match_desc = self.hashes_sha256[sha256_num] + match_hash = sha256 + match_score = self.hashes_scores[sha256_num] # If score is low change the description - if matchScore < 80: - matchLevel = "Suspicious" + if match_score < 80: + match_level = "Suspicious" # Hash string - hashString = f"MD5: {md5} SHA1: {sha1} SHA256: {sha256}" + hash_string = f"MD5: {md5} SHA1: {sha1} SHA256: {sha256}" - if matchType: + if match_type: reasons.append( "%s Hash TYPE: %s HASH: %s SUBSCORE: %d DESC: %s" - % (matchLevel, matchType, matchHash, matchScore, matchDesc) + % ( + match_level, + match_type, + match_hash, + match_score, + match_desc, + ) ) - total_score += matchScore + total_score += match_score # Script Anomalies Check if args.scriptanalysis: @@ -586,7 +592,7 @@ def scan_path_files(self, root, directories, files, bar): "FileScan", f"Performing character analysis on file {file_path} ... ", ) - message, score = self.script_stats_analysis(fileData) + message, score = self.script_stats_analysis(file_data) if message: reasons.append("%s SCORE: %s" % (message, score)) total_score += score @@ -594,7 +600,7 @@ def scan_path_files(self, root, directories, files, bar): # Yara Check ------------------------------------------------------- # Memory Dump Scan - if fileType == "MDMP": + if file_type == "MDMP": logger.log( "INFO", "FileScan", @@ -612,9 +618,9 @@ def scan_path_files(self, root, directories, files, bar): matched_strings, author, ) in self.scan_data( - fileData=fileData, - fileType=fileType, - fileName=file_name_cleaned, + file_data=file_data, + file_type=file_type, + file_name=file_name_cleaned, file_path=file_path_cleaned, extension=extension, md5=md5, # legacy rule support @@ -642,15 +648,15 @@ def scan_path_files(self, root, directories, files, bar): ) # Info Line ----------------------------------------------------------------------- - fileInfo = ( + file_info = ( "FILE: %s SCORE: %s TYPE: %s SIZE: %s FIRST_BYTES: %s %s %s " % ( file_path, total_score, - fileType, - fileSize, - firstBytesString, - hashString, + file_type, + file_size, + first_bytes_string, + hash_string, loki_get_age_string(file_path), ) ) @@ -670,7 +676,7 @@ def scan_path_files(self, root, directories, files, bar): continue # Reasons to message body - message_body = fileInfo + message_body = file_info for i, r in enumerate(reasons): if i < 2 or args.allreasons: message_body += "REASON_{0}: {1}".format(i + 1, r) @@ -685,10 +691,10 @@ def scan_path_files(self, root, directories, files, bar): def yara_externals( self, dummy, - fileName=b"-", + file_name=b"-", file_path=b"-", extension=b"-", - fileType="-", + file_type="-", md5="-", ): """ @@ -705,19 +711,19 @@ def yara_externals( } else: return { - "filename": fileName, + "filename": file_name, "filepath": file_path, "extension": extension, - "filetype": fileType, + "filetype": file_type, "md5": md5, "owner": "dummy", } def scan_data( self, - fileData, - fileType="-", - fileName=b"-", + file_data, + file_type="-", + file_name=b"-", file_path=b"-", extension=b"-", md5="-", @@ -731,14 +737,14 @@ def scan_data( # Yara Rule Match EXTERNALS = self.yara_externals( False, - fileName.decode("utf-8"), + file_name.decode("utf-8"), file_path.decode("utf-8"), extension, - fileType, + file_type, md5, ) matches = rules.match( - data=fileData, + data=file_data, externals=EXTERNALS, ) @@ -1477,11 +1483,11 @@ def get_file_data(self, file_path): """ get file data """ - fileData = b"" + file_data = b"" try: # Read file complete with open(file_path, "rb") as f: - fileData = f.read() + file_data = f.read() except Exception: if logger.debug: traceback.print_exc() @@ -1489,7 +1495,7 @@ def get_file_data(self, file_path): "DEBUG", "FileScan", f"Cannot open file {file_path} (access denied)" ) finally: - return fileData + return file_data def script_stats_analysis(self, data): """ @@ -1562,15 +1568,6 @@ def get_application_path(): sys.exit(1) -def processExists(pid): - """ - Checks if a given process is running - :param pid: - :return: - """ - return psutil.pid_exists(pid) - - def updateLoki(sigsOnly): """ update loki