-
Notifications
You must be signed in to change notification settings - Fork 8
/
agent.py
147 lines (122 loc) · 5.48 KB
/
agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from os import path
from collections import OrderedDict
import xpcspy
from xpcspy.lib.types import Event
import subprocess
import json
import traceback
def convert_value(value):
if value["type"] == "ascii_string":
return value["value"]
elif value["type"] == "unicode_string":
return value["value"]
elif value["type"] == "int":
return value["value"]
elif value["type"] == "uid":
return value["value"]
elif value["type"] == "dict":
return convert_dict(value["entries"])
elif value["type"] == "array":
return convert_array(value["entries"])
else:
return json.dumps(value)
def convert_dict(obj):
return {entry["key"]["value"]: convert_value(entry["value"]) for entry in obj}
def convert_array(obj):
return [convert_value(val) for val in obj]
class Agent:
def __init__(self, filter, should_parse, session, reactor, print_timestamp=False):
"""
Initialize the Frida agent
"""
self._pending_events = (
OrderedDict()
) # A map of stacks, each stack holding events for that particular timestamp
self._filter = filter
self._should_parse = should_parse
self._print_timestamp = print_timestamp
self._script_path = path.join(path.dirname(xpcspy.__file__), "..", "_agent.js")
with open(self._script_path) as src_f:
script_src = src_f.read()
self._script = session.create_script(script_src)
self._reactor = reactor
self._agent = None
def start_hooking(self, ui):
def on_message(message, data):
self._reactor.schedule(lambda: self._on_message(message, data, ui))
self._script.on("message", on_message)
self._script.load()
ui._update_status("Installing hooks...")
self._agent = self._script.exports
self._agent.install_hooks(self._filter, self._should_parse)
def _on_message(self, message, data, ui):
mtype = message["payload"]["type"]
if mtype == "agent:hooks_installed":
ui._update_status("Hooks installed, intercepting messages...")
ui._resume()
elif mtype == "agent:trace:symbol":
symbol = message["payload"]["message"]["symbol"]
timestamp = message["payload"]["message"]["timestamp"]
if timestamp in self._pending_events:
self._pending_events[timestamp].append(Event(symbol))
else:
self._pending_events.update({timestamp: [Event(symbol)]})
elif mtype == "agent:trace:data":
timestamp = message["payload"]["message"]["timestamp"]
data = message["payload"]["message"]["data"]
self._pending_events[timestamp][-1].data = data
else:
ui._print(f"Unhandled message {message}")
self.flush_pending_events(ui)
def flush_pending_events(self, ui):
"""Flush pending events that are ready, i.e. have received both its symbol and data"""
for ts, events_stack in list(self._pending_events.items()):
while len(events_stack) > 0:
last_event = events_stack[-1] # Peek
if last_event.data == None:
return
for line in last_event.data["message"].splitlines():
if "<62706c69" in line:
encoded_bplist = line[
line.index("<") + 1 : line.index(">", -1)
].replace(" ", "")
cmd = f"echo {encoded_bplist} | xxd -r -p | fq d -V"
decoded_bplist = subprocess.check_output(
cmd, shell=True
).decode("utf-8")
payload = json.loads(decoded_bplist)
print(payload)
data = convert_array(
payload["objects"]["entries"][3]["value"]["entries"]
)
indices = data[1]
if len(indices["NS.objects"]) == 0:
continue
else:
indices = indices["NS.objects"]
print("-" * 40)
lines_printed = 0
for i in indices:
try:
if data[i]["$class"] in [4, 10, 12]:
replacement_str = data[
data[i]["NSReplacementString"]
]
promoted = "NSIsPromoted" in data[i]
if promoted:
print(f"*** {replacement_str} ***")
else:
print(replacement_str)
elif data[i]["$class"] == 6:
replacement_str = data[
data[i]["NSReplacementString"]
]
print(f"*** {replacement_str} ***")
else:
continue
lines_printed += 1
except:
print(traceback.format_exc())
print(data)
events_stack.pop()
del self._pending_events[ts]