Skip to content

Commit

Permalink
feat: overall properties evaluation
Browse files Browse the repository at this point in the history
  • Loading branch information
Otto-AA committed Aug 6, 2024
1 parent 519d52a commit 523177e
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 39 deletions.
36 changes: 21 additions & 15 deletions traces_analyzer/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@
from traces_analyzer.evaluation.financial_gain_loss_evaluation import (
FinancialGainLossEvaluation,
)
from traces_analyzer.evaluation.instruction_differences_evaluation import (
InstructionDifferencesEvaluation,
)
from traces_analyzer.evaluation.instruction_usage_evaluation import (
InstructionUsageEvaluation,
from traces_analyzer.evaluation.overall_properties_evaluation import (
OverallPropertiesEvaluation,
)
from traces_analyzer.evaluation.securify_properties_evaluation import (
SecurifyPropertiesEvaluation,
Expand Down Expand Up @@ -51,7 +48,6 @@
)
from traces_parser.parser.instructions.instructions import (
CALL,
STATICCALL,
)
from traces_parser.parser.instructions_parser import (
TransactionParsingInfo,
Expand Down Expand Up @@ -114,8 +110,18 @@ def analyze_transactions_in_dir(bundle: PotentialAttack, out_dir: Path, verbose:
verbose,
)

overall_properties_evaluation = OverallPropertiesEvaluation(
attackers=(bundle.tx_a.caller, bundle.tx_a.to),
victim=bundle.tx_b.caller,
securify_properties_evaluations=(evaluations_a[0], evaluations_b[0]), # type: ignore
financial_gain_loss_evaluations=(evaluations_a[1], evaluations_b[1]), # type: ignore
)

save_evaluations(evaluations_a, out_dir / f"{bundle.id}_{bundle.tx_a.hash}.json")
save_evaluations(evaluations_b, out_dir / f"{bundle.id}_{bundle.tx_b.hash}.json")
save_evaluations([overall_properties_evaluation], out_dir / f"{bundle.id}.json")

print(overall_properties_evaluation.cli_report())

if verbose:
print(f"Tx A: {bundle.tx_b.hash}")
Expand Down Expand Up @@ -265,15 +271,15 @@ def compare_traces(
currency_changes_analyzer.reverse.currency_changes,
),
TODSourceEvaluation(tod_source_analyzer.get_tod_source()),
InstructionDifferencesEvaluation(
occurrence_changes=instruction_changes_analyzer.get_instructions_only_executed_by_one_trace(),
input_changes=instruction_changes_analyzer.get_instructions_with_different_inputs(),
),
InstructionUsageEvaluation(
instruction_usage_analyzers.normal.get_used_opcodes_per_contract(),
instruction_usage_analyzers.reverse.get_used_opcodes_per_contract(),
filter_opcodes=[CALL.opcode, STATICCALL.opcode],
),
# InstructionDifferencesEvaluation(
# occurrence_changes=instruction_changes_analyzer.get_instructions_only_executed_by_one_trace(),
# input_changes=instruction_changes_analyzer.get_instructions_with_different_inputs(),
# ),
# InstructionUsageEvaluation(
# instruction_usage_analyzers.normal.get_used_opcodes_per_contract(),
# instruction_usage_analyzers.reverse.get_used_opcodes_per_contract(),
# filter_opcodes=[CALL.opcode, STATICCALL.opcode],
# ),
]

return evaluations
Expand Down
63 changes: 39 additions & 24 deletions traces_analyzer/evaluation/financial_gain_loss_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,30 +54,19 @@ def _cli_report(self) -> str:

return s

def get_gains_and_losses(self) -> GainsAndLosses:
return self._gains_and_losses


def compute_gains_and_losses(
changes_normal: Sequence[tuple[Instruction, CurrencyChange]],
changes_reverse: Sequence[tuple[Instruction, CurrencyChange]],
) -> GainsAndLosses:
# TODO: we should add T_A and T_B together and then compare them
grouped_normal = group_by_address(changes_normal)
grouped_reverse = group_by_address(changes_reverse)

net_changes = subtract_changes(grouped_normal, grouped_reverse)

gains: dict[str, dict[str, CurrencyChange]] = defaultdict(dict)
losses: dict[str, dict[str, CurrencyChange]] = defaultdict(dict)
for addr, changes in net_changes.items():
for key, change in changes.items():
if change["change"] > 0:
gains[addr][key] = change
if change["change"] < 0:
losses[addr][key] = change

return {
"gains": gains,
"losses": losses,
}
net_changes = add_changes(grouped_normal, negated_changes(grouped_reverse))
return split_to_gains_and_losses(net_changes)


def group_by_address(
Expand All @@ -96,15 +85,41 @@ def group_by_address(
return groups


def subtract_changes(base: CURRENCY_CHANGES_BY_ADDR, operand: CURRENCY_CHANGES_BY_ADDR):
def split_to_gains_and_losses(changes: CURRENCY_CHANGES_BY_ADDR) -> GainsAndLosses:
gains: dict[str, dict[str, CurrencyChange]] = defaultdict(dict)
losses: dict[str, dict[str, CurrencyChange]] = defaultdict(dict)
for addr, c in changes.items():
for key, change in c.items():
if change["change"] > 0:
gains[addr][key] = change
if change["change"] < 0:
losses[addr][key] = change

return {
"gains": gains,
"losses": losses,
}


def add_changes(base: CURRENCY_CHANGES_BY_ADDR, *operands: CURRENCY_CHANGES_BY_ADDR):
result = deepcopy(base)

for addr, changes in operand.items():
for key, change in changes.items():
if key not in result[addr]:
result[addr][key] = deepcopy(change)
result[addr][key]["change"] *= -1
else:
result[addr][key]["change"] -= change["change"]
for operand in operands:
for addr, changes in operand.items():
for key, change in changes.items():
if key not in result[addr]:
result[addr][key] = deepcopy(change)
else:
result[addr][key]["change"] += change["change"]

return result


def negated_changes(changes: CURRENCY_CHANGES_BY_ADDR):
result = deepcopy(changes)

for addr, c in result.items():
for key in c:
result[addr][key]["change"] *= -1

return result
132 changes: 132 additions & 0 deletions traces_analyzer/evaluation/overall_properties_evaluation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from typing_extensions import override
from traces_analyzer.evaluation.evaluation import Evaluation
from traces_analyzer.evaluation.financial_gain_loss_evaluation import (
FinancialGainLossEvaluation,
GainsAndLosses,
add_changes,
split_to_gains_and_losses,
)
from traces_analyzer.evaluation.securify_properties_evaluation import (
SecurifyProperties,
SecurifyPropertiesEvaluation,
)
from traces_parser.datatypes.hexstring import HexString


class OverallProperties(SecurifyProperties):
attacker_gain_and_victim_loss: bool
attacker_eoa_gain: bool
attacker_eoa_loss: bool
attacker_bot_gain: bool
attacker_bot_loss: bool
victim_gain: bool
victim_loss: bool


class OverallPropertiesEvaluation(Evaluation):
@property
@override
def _type_key(self):
return "overall_properties"

@property
@override
def _type_name(self):
return "Properties"

def __init__(
self,
attackers: tuple[HexString, HexString],
victim: HexString,
securify_properties_evaluations: tuple[
SecurifyPropertiesEvaluation, SecurifyPropertiesEvaluation
],
financial_gain_loss_evaluations: tuple[
FinancialGainLossEvaluation, FinancialGainLossEvaluation
],
):
super().__init__()
self._securify_properties = merge_securify_properties(
securify_properties_evaluations[0].get_properties(),
securify_properties_evaluations[1].get_properties(),
)
self._gains_and_losses = merge_financial_gain_loss(
financial_gain_loss_evaluations[0].get_gains_and_losses(),
financial_gain_loss_evaluations[1].get_gains_and_losses(),
)
self._props = compute_props(
self._securify_properties, self._gains_and_losses, attackers, victim
)
self.attackers = attackers
self.victim = victim

@override
def _dict_report(self) -> dict:
return {
"properties": dict(self._props),
"attacker_EOA": self.attackers[0].with_prefix(),
"attacker_potential_bot": self.attackers[1].with_prefix(),
"victim": self.victim.with_prefix(),
"overall_gains_and_losses": dict(self._gains_and_losses),
}

@override
def _cli_report(self) -> str:
return f"""Attacker gain and victim loss: {self._props['attacker_gain_and_victim_loss']}
TOD Transfer: {self._props['TOD_Transfer']}
TOD Amount: {self._props['TOD_Amount']}
TOD Receiver: {self._props['TOD_Receiver']}"""


def merge_securify_properties(
props_a: SecurifyProperties, props_b: SecurifyProperties
) -> SecurifyProperties:
return {
"TOD_Transfer": props_a["TOD_Transfer"] or props_b["TOD_Transfer"],
"TOD_Amount": props_a["TOD_Amount"] or props_b["TOD_Amount"],
"TOD_Receiver": props_a["TOD_Receiver"] or props_b["TOD_Receiver"],
}


def merge_financial_gain_loss(a: GainsAndLosses, b: GainsAndLosses):
overall_changes = add_changes(a["gains"], a["losses"], b["gains"], b["losses"])
return split_to_gains_and_losses(overall_changes)


def compute_props(
securify_props: SecurifyProperties,
gains_and_losses: GainsAndLosses,
attackers: tuple[HexString, HexString],
victim: HexString,
) -> OverallProperties:
attacker_eoa_gain, attacker_eoa_loss = check_gain_loss(
gains_and_losses, attackers[0]
)
attacker_bot_gain, attacker_bot_loss = check_gain_loss(
gains_and_losses, attackers[1]
)
victim_gain, victim_loss = check_gain_loss(gains_and_losses, victim)

overall_prop = (
(attacker_eoa_gain and not attacker_eoa_loss)
or (attacker_bot_gain and not attacker_bot_loss)
) and (victim_loss and not victim_gain)

return {
"TOD_Transfer": securify_props["TOD_Transfer"],
"TOD_Amount": securify_props["TOD_Amount"],
"TOD_Receiver": securify_props["TOD_Receiver"],
"attacker_gain_and_victim_loss": overall_prop,
"attacker_eoa_gain": attacker_eoa_gain,
"attacker_eoa_loss": attacker_eoa_loss,
"attacker_bot_gain": attacker_bot_gain,
"attacker_bot_loss": attacker_bot_loss,
"victim_gain": victim_gain,
"victim_loss": victim_loss,
}


def check_gain_loss(gains_and_losses: GainsAndLosses, address: HexString):
gains = address.with_prefix().lower() in gains_and_losses["gains"]
losses = address.with_prefix().lower() in gains_and_losses["losses"]
return gains, losses
3 changes: 3 additions & 0 deletions traces_analyzer/evaluation/securify_properties_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ def _cli_report(self) -> str:
TOD Receiver: {self._properties['TOD_Receiver']}
"""

def get_properties(self) -> SecurifyProperties:
return self._properties


def check_securify_properties(
calls_normal: CALLS_BY_LOC, calls_reverse: CALLS_BY_LOC
Expand Down

0 comments on commit 523177e

Please sign in to comment.