-
Notifications
You must be signed in to change notification settings - Fork 0
/
osvt.py
77 lines (65 loc) · 2.84 KB
/
osvt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#!/usr/bin/env python
"""
Author: Abdulmalik Banaser
Email: ab4594@rit.edu
To run extention:
> $ osqueryi --nodisable-extensions
> $ SELECT value FROM osquery_flags WHERE name = 'extensions_socket';
> Copy the socket extension path
> $ python3 osvt.py --socket <Value returned above by Osquery>
"""
import osquery
from virus_total_apis import PublicApi as VirusTotalPublicApi
import time
import os
@osquery.register_plugin
class VirusTotalPlugin(osquery.TablePlugin):
"""
A class defntion to create new Osquery table
"""
def name(self):
"""
Name attribute for the table.
"""
return "osvt"
def columns(self):
"""
Defining the columns' title in the table
"""
return [
osquery.TableColumn(name="path", type=osquery.STRING),
osquery.TableColumn(name="sha256", type=osquery.STRING),
osquery.TableColumn(name="virus total detection rate (%)", type=osquery.STRING)
]
def generate(self, context):
"""
Populate each row in the table
"""
KEY = "" # Virus Total API key
vt = VirusTotalPublicApi(KEY) # Create Virus Total object using the API key
query_data = [] # The resulted data to be added to the table.
INSTANCE = osquery.SpawnInstance() # Spawn an Osquery instance to populate the table.
INSTANCE.open()
directory = input("[*] Please enter the full path of a directory: ").strip() # User input for the directory.
while (os.path.isdir(directory) is False):
print("[-] Directory does not exists, please try again!")
directory = input("[*] Please enter the full path of a directory: ").strip()
RESULTS = INSTANCE.client.query(f"select path, sha256 from hash where directory = '{directory}'") # Run the pre-defined Osquery command for the given directory.
if RESULTS.status.code != 0: # Check the status code of the result of the query.
print("[-] Error running the query: %s" % RESULTS.status.message)
else:
print("[+] Success running the query: %s" % RESULTS.status.message)
for qrow in RESULTS.response:
row = {}
row["path"] = qrow["path"]
row["sha256"] = qrow["sha256"]
response = vt.get_file_report(qrow["sha256"]) # Submit the file hash to Virust Total
row["virus total detection rate (%)"] = f"{100 * response['results']['positives'] // response['results']['total']}%" # Compute the percentage of detection
query_data.append(row)
time.sleep(15) # Sleep of 15 seconds since the community API for Virus Total only allow 4 queries per minute.
return query_data
if __name__ == "__main__":
while True:
osquery.start_extension(
name="osvt",
version="1.0.0",)