-
Notifications
You must be signed in to change notification settings - Fork 6
/
injector_class.py
executable file
·590 lines (526 loc) · 25.7 KB
/
injector_class.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
# Copyright lowRISC contributors.
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0
import copy
import gc
import logging
import sys
import networkx as nx
import ray
from typing import DefaultDict
from pysat.solvers import Minisat22
import helpers
from formula_class import FormulaBuilder
from helpers import Edge, FIResult, Node, NodePin, NodePort
logger = logging.getLogger()
@ray.remote
class FiInjector:
""" Class for performing distributed fault injections.
This class provides functionality to inject faults into the target circuit
according to the fault model, to create the differential graph, to
extract the boolean CNF formula of this graph, and to hand the formula to
a SAT solver.
"""
def __init__(self, fault_name, target_graph, graph, fault_locations,
fault_model, cell_lib):
""" Inits the injector class
This function injects faults into the graph by replacing the type of
the target node with the target type.
Args:
fault_name: The fault model name of the current attack.
target_graph: The graph to be evaluated.
graph: The unmodified graph of the whole circuit.
fault_locations: The location and fault mapping of the faults.
fault_model: The fault model.
cell_lib: The imported cell library.
"""
self.fault_name = fault_name
self.target_graph = target_graph
self.graph = graph
self.fault_locations = fault_locations
self.fault_model = fault_model
self.cell_lib = cell_lib
def _inject_faults(self, fault_location):
""" Inject faults into the target graph accoding to the fault location.
This function injects faults into the graph by replacing the type of
the target node with the target type.
Args:
fault_location: The current fault location (location+gate mapping).
Returns:
The faulty target graph.
"""
faulty_graph = copy.deepcopy(self.target_graph)
for fl in fault_location:
stage = fl.stage
node_target = fl.location
fault_type = fl.mapping
# Find the nodes in the target graph which are replaced with the
# faulty nodes.
nodes = [
n for n, d in self.target_graph.nodes(data=True)
if (d["node"].name == node_target and d["node"].stage == stage)
]
for node in nodes:
current_type = faulty_graph.nodes[node]["node"].type
# Check if the cell type is in the cell library.
type_found, current_type = helpers.check_gate_type(
current_type, self.cell_lib.gate_in_type)
if not type_found:
logger.error(f"Err: Gate type {current_type} not found.")
sys.exit()
# Check if the fault cell type is in the cell library.
type_found, fault_type = helpers.check_gate_type(
fault_type, self.cell_lib.gate_in_type)
if not type_found:
logger.error(f"Err: Gate type {fault_type} not found.")
sys.exit()
# Replace the type of the gate.
faulty_graph.nodes[node]["node"].type = fault_type
faulty_graph.nodes[node]["node"].node_color = "red"
# Update the inport port.
gate_in_type_current = self.cell_lib.gate_in_type[current_type]
gate_in_type_faulty = self.cell_lib.gate_in_type[fault_type]
if gate_in_type_current != gate_in_type_faulty:
# We need to remap the input ports as the input type
# mismatches.
in_port_mapping = self.cell_lib.port_in_mapping[
gate_in_type_current][gate_in_type_faulty]
# Update the port name in the node dict.
for port in faulty_graph.nodes[node]["node"].in_ports:
port.name = in_port_mapping[port.name]
# Update port name in the edge.
for edge_out, edge_in in faulty_graph.in_edges(node):
for edge_num, edge_data in faulty_graph.get_edge_data(
edge_out, edge_in).items():
edge_data["edge"].in_port = in_port_mapping[
edge_data["edge"].in_port]
# Update the output port.
gate_out_type_current = self.cell_lib.gate_out_type[
current_type]
gate_out_type_faulty = self.cell_lib.gate_out_type[fault_type]
if gate_out_type_current != gate_out_type_faulty:
# We need to remap the output pins as the output type
# mismatches.
out_port_mapping = self.cell_lib.port_out_mapping[
gate_out_type_current][gate_out_type_faulty]
# Update the port name in the node dict.
for port in faulty_graph.nodes[node]["node"].out_ports:
port.name = out_port_mapping[port.name]
# Update port name in the edge.
for edge_out, edge_in in faulty_graph.out_edges(node):
for edge_num, edge_data in faulty_graph.get_edge_data(
edge_out, edge_in).items():
if edge_data["edge"].out_port in out_port_mapping:
edge_data["edge"].out_port = out_port_mapping[
edge_data["edge"].out_port]
return faulty_graph
def _check_port_pin(self, node, ports, port_name, pin_number):
""" Check if the user provided port and pin for an input/output node
is valid. Terminates on an error.
Args:
node: The provided input/output node.
ports: The ports of the corresponding node.
port_name: The provided port name.
pin_number: The provided pin number.
"""
# Input and output ports do not have in/out ports.
if not ports:
return
for port in ports:
if port.name == port_name:
for pin in port.pins:
if pin.number == int(pin_number):
return
logger.error(
f"Provided port/pin {port_name}/{pin_number} for node {node} not found."
)
sys.exit()
def _add_in_logic(self, diff_graph):
""" Add the input logic to the differential graph.
In the input logic, the input nodes defined in the fault model are
connected with their predefined value.
Args:
diff_graph: The differential graph.
Returns:
The differential graph with the input nodes.
"""
diff_graph_in_logic = copy.deepcopy(diff_graph)
# Add the null and one node for the predefined values.
diff_graph_in_logic.add_node(
"null", **{
"node":
Node(name="null",
parent_name="null",
type="null_node",
in_ports=[],
out_ports=[
NodePort(name="O",
type="output",
pins=[NodePin(number=0, wire="0")])
],
stage="",
node_color="black")
})
diff_graph_in_logic.add_node(
"one", **{
"node":
Node(name="one",
parent_name="one",
type="one_node",
in_ports=[],
out_ports=[
NodePort(name="O",
type="output",
pins=[NodePin(number=0, wire="1")])
],
stage="",
node_color="black")
})
# Get the input values from the fault model and connect each input node
# with the null / one node.
input_values = self.fault_model["input_values"]
for node_name, value in input_values.items():
# Find all input nodes and connect with node.
nodes = [
n for n, d in diff_graph.nodes(data=True)
if (d["node"].parent_name == node_name and (
d["node"].type == "input"
or d["node"].type == "input_fault"))
]
for node in nodes:
# Fetch the data for each pin of the provided in node.
for port_name, port in value.items():
for pin_number, pin_value in port.items():
value_str = ""
node_type = diff_graph.nodes[node]["node"].type
# Set the input value.
if pin_value == 1:
value_str = "one"
else:
value_str = "null"
# Check if the provided port/pin exists for the node.
self._check_port_pin(
node, diff_graph.nodes[node]["node"].in_ports,
port_name, pin_number)
# Connect the null/one node with the input node.
edge = Edge(in_port=port_name,
in_pin=pin_number,
out_port="O",
out_pin=0,
wire="")
diff_graph_in_logic.add_edge(value_str,
node,
edge=edge)
return diff_graph_in_logic
def _add_xor_xnor_nodes(self, diff_graph_out_logic, node, value, node_name,
node_type, out_port, out_pin):
""" Add the xor/xnor node.
In this function, the actual node for the comparison is added to the
graph.
Args:
diff_graph_out_logic: The differential graph with the connected output.
node: The current node.
value: The output value set in the fault model.
node_name: The name of the new node.
node_type: The type of the new node.
out_port: The port of the new node.
out_pin: The pin of the new node.
"""
diff_graph_out_logic.add_node(
node_name, **{
"node":
Node(name=node_name,
parent_name=node_name,
type=node_type,
in_ports=[
NodePort(type="input",
name="I",
pins=[
NodePin(number=0, wire=""),
NodePin(number=1, wire="")
])
],
out_ports=[
NodePort(type="output",
name="O",
pins=[NodePin(number=0, wire="")])
],
stage="out_stage",
node_color="purple")
})
null_one = "null"
if value == 1:
null_one = "one"
# Add the connection between and the null/one and the XOR/XNOR node.
edge = Edge(in_port="I0", in_pin=0, out_port="O", out_pin=0, wire="")
diff_graph_out_logic.add_edge(null_one, node_name, edge=edge)
# Add the connection between the output and the XOR/XNOR node.
edge = Edge(in_port="I1",
in_pin=0,
out_port=out_port,
out_pin=int(out_pin),
wire="")
diff_graph_out_logic.add_edge(node, node_name, edge=edge)
def _add_output(self, diff_graph_out_logic, diff_graph, values, faulty,
alert, exp_fault):
""" Add the output logic.
In this function, the comparison with the output value, the alert value
or the expected fault value is done. Depending on the mode of comparison
we use XOR or XNOR functions to do this comparison.
Args:
diff_graph_out_logic: The differential graph with the connected output.
diff_graph: The differential graph.
values: The output values set in the fault model.
faulty: Is the node a faulty node?
alert: Is the node an alert node?
exp_fault: Is the node an expected fault value node?
Returns:
A list with the added nodes.
"""
out_nodes_added = []
for node_target, ports in values.items():
nodes = [
n for n, d in diff_graph.nodes(data=True)
if (d["node"].parent_name == node_target and (
d["node"].type == "output"
or d["node"].type == "output_fault"))
]
for node in nodes:
# Check if all output ports are connected. If not connected
# ignore.
unconnected_ports = DefaultDict(list)
if len(diff_graph.in_edges(node)) != len(diff_graph.nodes[node]["node"].in_ports):
for in_port in diff_graph.nodes[node]["node"].in_ports:
for in_pin in in_port.pins:
wire_connected = False
for out_node, in_node, data in diff_graph.in_edges(node, data=True):
if in_pin.wire == data["edge"].wire:
wire_connected = True
break
if not wire_connected:
unconnected_ports[in_port.name].append(in_pin.number)
# Fetch the port/pin values provided in the FI model.
for port_name, port in ports.items():
for pin_number, pin_value in port.items():
# Check if port and pin is connected. If not connected
# skip.
port_connected = True
if port_name in unconnected_ports:
if int(pin_number) in unconnected_ports[port_name]:
port_connected = False
if port_connected:
# If the node consists of output edges in the orig.
# graph, check the provided ports/pins.
if self.graph.out_edges(node):
self._check_port_pin(node, diff_graph.nodes[node]["node"].out_ports, port_name, pin_number)
node_id = node + "_" + port_name + "_" + str(
pin_number)
if faulty:
if "_faulty" in node:
node_name = node_id + "_xor"
node_type = "xor"
if alert or exp_fault:
node_name = node_id + "_xnor_alert_exp"
if alert or exp_fault:
node_type = "xnor"
out_nodes_added.append(node_name)
self._add_xor_xnor_nodes(
diff_graph_out_logic, node, pin_value,
node_name, node_type, port_name,
pin_number)
else:
if "_faulty" not in node:
node_name = node_id + "_xnor"
if alert:
node_name = node_id + "_xnor_alert"
node_type = "xnor"
out_nodes_added.append(node_name)
self._add_xor_xnor_nodes(
diff_graph_out_logic, node, pin_value,
node_name, node_type, port_name,
pin_number)
# Update the in_ports and out_ports of the node.
add_port = True
for out_port in diff_graph_out_logic.nodes[node][
"node"].out_ports:
if out_port.name == port_name:
out_port.pins.append(NodePin(number=int(pin_number), wire=""))
add_port = False
break
if add_port:
diff_graph_out_logic.nodes[node][
"node"].out_ports.append(
NodePort(type="output",
name=port_name,
pins=[
NodePin(
number=int(pin_number),
wire="")
]))
return out_nodes_added
def _connect_outputs(self, diff_graph_out_logic, out_nodes, out_logic):
""" Connect a list of nodes with an output node.
Each node in the out_nodes list is connected with the output node
of type out_logic.
Args:
diff_graph_out_logic: The differential graph.
out_nodes: The nodes to connect.
out_logic: The type of the node.
Returns:
The name of the new node.
"""
# Flatten the list.
out_nodes = [item for sublist in out_nodes for item in sublist]
# Connect the outputs of the XNOR/XORs with an AND.
out_name = "output_logic_" + out_logic
diff_graph_out_logic.add_node(
out_name, **{
"node":
Node(name=out_name,
parent_name=out_name,
type=out_logic,
in_ports=[NodePort(type="input", name="I", pins=[])],
out_ports=[
NodePort(type="output",
name="O",
pins=[NodePin(number=0, wire="")])
],
stage="",
node_color="purple")
})
cntr = 1
for out_node in out_nodes:
diff_graph_out_logic.nodes[out_name]["node"].in_ports[
0].pins.append(NodePin(number=cntr, wire=""))
diff_graph_out_logic.add_edge(out_node,
out_name,
edge=Edge(in_port="A" + str(cntr),
in_pin=0,
out_port="O",
out_pin=0,
wire=""))
cntr += 1
return out_name
def _add_out_logic(self, diff_graph):
""" Add the output logic to the differential graph.
For the non-faulty graph in the differential graph:
-XNORs are used to compare the output to the expected output.
For the faulty graph in the differential graph:
-XORs are used to compare the output to the expected output.
-XNORs are used to compare the alert signals to the expected
alert output signals. We are using XNORs as we are only interested in
faults manipulating the output but not the alert signal.
All XNORs/XORs are ANDed to produce the final circuit.
Args:
diff_graph: The differential graph.
Returns:
The differential graph with the output logic.
"""
diff_graph_out_logic = copy.deepcopy(diff_graph)
output_values = self.fault_model["output_values"]
alert_values = self.fault_model["alert_values"]
fault_values = self.fault_model["output_fault_values"]
out_and_nodes = []
out_or_nodes = []
# Add the output logic for the non-faulty output values.
out_and_nodes.append(
self._add_output(diff_graph_out_logic, diff_graph, output_values,
False, False, False))
# Add the output logic for the faulty output values. If there are
# expected fault values, connect them with an AND, else with an OR.
if not fault_values:
out_or_nodes.append(
self._add_output(diff_graph_out_logic, diff_graph,
output_values, True, False, False))
else:
out_and_nodes.append(
self._add_output(diff_graph_out_logic, diff_graph,
fault_values, True, False, True))
if alert_values:
# Add the output logic for the non-faulty output values.
out_and_nodes.append(
self._add_output(diff_graph_out_logic, diff_graph,
alert_values, False, True, False))
# Add the output logic for the faulty output values.
out_and_nodes.append(
self._add_output(diff_graph_out_logic, diff_graph,
alert_values, True, True, False))
# AND all output nodes (non-faulty, alert, expected fault values).
and_output = self._connect_outputs(diff_graph_out_logic, out_and_nodes,
"and")
self._connect_outputs(diff_graph_out_logic, [[and_output]],
"terminate")
# Connect the AND node with the terminate node.
if not fault_values:
# OR all faulty output nodes.
or_output = self._connect_outputs(diff_graph_out_logic,
out_or_nodes, "or")
# Connect the output of the OR logic with the AND output logic.
num_in_edges = 1
for edge in diff_graph_out_logic.in_edges(and_output):
num_in_edges += 1
edge = Edge(in_port="A" + str(num_in_edges),
in_pin=0,
out_port="O",
out_pin=0,
wire="")
diff_graph_out_logic.add_edge(or_output, and_output, edge=edge)
return diff_graph_out_logic
def _create_diff_graph(self, faulty_graph):
""" Create the differential graph based on the faulty graph.
This function creates the differential graph by merging the faulty graph
and the unmodified target graph into a common graph. The inputs of the
differential graph are set to predefined values specified in the fault
model. The output is compared to a predefined value using a XNOR. To
get a single output port, the output of the XNORs are connected using a
AND.
Args:
faulty_graph: The target graph with the faulty nodes.
Returns:
The differential graph.
"""
orig_graph = copy.deepcopy(self.target_graph)
faulty_graph_renamed = copy.deepcopy(faulty_graph)
# Rename the faulty graph.
faulty_graph_renamed = helpers.rename_nodes(faulty_graph_renamed,
"_faulty", True)
# Merge the graphs into a common graph.
diff_graph = nx.compose(orig_graph, faulty_graph_renamed)
# Add the input logic. Here, the values are set to a defined value.
diff_graph_in_logic = self._add_in_logic(diff_graph)
# Add the output logic. The output logic compares the result with the
# expected result stored in "output_values" of the fault model.
diff_graph_out_logic = self._add_out_logic(diff_graph_in_logic)
return diff_graph_out_logic
def perform_attack(self) -> FIResult:
""" Perform the attack.
Here, the attack on the target graph is conducted. First, a faulty graph
is created. In this graph, the target fault nodes are replaced according
to the fault mapping. Then, the differential graph is created and
converted to a boolean formula.
Returns:
The result of the attack.
"""
results = []
for fault_location in self.fault_locations:
# Inject the faults into the target graph.
faulty_graph = self._inject_faults(fault_location)
# Create the differential graph.
diff_graph = self._create_diff_graph(faulty_graph)
solver = Minisat22()
# Transform the differential graph to a boolean formula
formula_builder = FormulaBuilder(diff_graph, self.cell_lib, solver)
solver = formula_builder.transform_graph()
# Set one to logical one and zero to logical zero.
solver.add_clause([self.cell_lib.one])
solver.add_clause([-self.cell_lib.zero])
# Hand the boolean formula to the SAT solver.
sat_result = solver.solve()
# Cleanup.
solver.delete()
# Append result.
results.append(
FIResult(fault_name=self.fault_name,
sat_result=sat_result,
fault_location=fault_location))
return results