Skip to content

Commit

Permalink
Merge pull request #986 from mandiant/feature-981
Browse files Browse the repository at this point in the history
add Address abstraction
  • Loading branch information
williballenthin authored Jun 21, 2022
2 parents 2ceed78 + be2dffe commit fb99ef5
Show file tree
Hide file tree
Showing 67 changed files with 3,484 additions and 2,310 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
- add unmanaged call characteristic for dotnet files #1023 @mike-hunhoff
- add mixed mode characteristic feature extraction for dotnet files #1024 @mike-hunhoff
- emit class and namespace features for dotnet files #1030 @mike-hunhoff
- render: support Addresses that aren't simple integers, like .NET token+offset #981 @williballenthin

### Breaking Changes

- instruction scope and operand feature are new and are not backwards compatible with older versions of capa
- Python 3.7 is now the minimum supported Python version #866 @williballenthin
- remove /x32 and /x64 flavors of number and operand features #932 @williballenthin
- the tool now accepts multiple paths to rules, and JSON doc updated accordingly @williballenthin
- extractors must use handles to identify functions/basic blocks/instructions #981 @williballenthin
- the freeze file format schema was updated, including format version bump to v2 #986 @williballenthin

### New Rules (7)

Expand Down
13 changes: 7 additions & 6 deletions capa/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import capa.perf
import capa.features.common
from capa.features.common import Result, Feature
from capa.features.address import Address

if TYPE_CHECKING:
# circular import, otherwise
Expand All @@ -26,7 +27,7 @@
# to collect the locations of a feature, do: `features[Number(0x10)]`
#
# aliased here so that the type can be documented and xref'd.
FeatureSet = Dict[Feature, Set[int]]
FeatureSet = Dict[Feature, Set[Address]]


class Statement:
Expand Down Expand Up @@ -257,10 +258,10 @@ def evaluate(self, ctx, **kwargs):
# inspect(match_details)
#
# aliased here so that the type can be documented and xref'd.
MatchResults = Mapping[str, List[Tuple[int, Result]]]
MatchResults = Mapping[str, List[Tuple[Address, Result]]]


def index_rule_matches(features: FeatureSet, rule: "capa.rules.Rule", locations: Iterable[int]):
def index_rule_matches(features: FeatureSet, rule: "capa.rules.Rule", locations: Iterable[Address]):
"""
record into the given featureset that the given rule matched at the given locations.
Expand All @@ -277,7 +278,7 @@ def index_rule_matches(features: FeatureSet, rule: "capa.rules.Rule", locations:
namespace, _, _ = namespace.rpartition("/")


def match(rules: List["capa.rules.Rule"], features: FeatureSet, va: int) -> Tuple[FeatureSet, MatchResults]:
def match(rules: List["capa.rules.Rule"], features: FeatureSet, addr: Address) -> Tuple[FeatureSet, MatchResults]:
"""
match the given rules against the given features,
returning an updated set of features and the matches.
Expand Down Expand Up @@ -315,10 +316,10 @@ def match(rules: List["capa.rules.Rule"], features: FeatureSet, va: int) -> Tupl
# sanity check
assert bool(res) is True

results[rule.name].append((va, res))
results[rule.name].append((addr, res))
# we need to update the current `features`
# because subsequent iterations of this loop may use newly added features,
# such as rule or namespace matches.
index_rule_matches(features, rule, [va])
index_rule_matches(features, rule, [addr])

return (features, results)
110 changes: 110 additions & 0 deletions capa/features/address.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import abc

from dncil.clr.token import Token


class Address(abc.ABC):
@abc.abstractmethod
def __eq__(self, other):
...

@abc.abstractmethod
def __lt__(self, other):
# implement < so that addresses can be sorted from low to high
...

@abc.abstractmethod
def __hash__(self):
# implement hash so that addresses can be used in sets and dicts
...

@abc.abstractmethod
def __repr__(self):
# implement repr to help during debugging
...


class AbsoluteVirtualAddress(int, Address):
"""an absolute memory address"""

def __new__(cls, v):
assert v >= 0
return int.__new__(cls, v)

def __repr__(self):
return f"absolute(0x{self:x})"


class RelativeVirtualAddress(int, Address):
"""a memory address relative to a base address"""

def __repr__(self):
return f"relative(0x{self:x})"


class FileOffsetAddress(int, Address):
"""an address relative to the start of a file"""

def __new__(cls, v):
assert v >= 0
return int.__new__(cls, v)

def __repr__(self):
return f"file(0x{self:x})"


class DNTokenAddress(Address):
"""a .NET token"""

def __init__(self, token: Token):
self.token = token

def __eq__(self, other):
return self.token.value == other.token.value

def __lt__(self, other):
return self.token.value < other.token.value

def __hash__(self):
return hash(self.token.value)

def __repr__(self):
return f"token(0x{self.token.value:x})"


class DNTokenOffsetAddress(Address):
"""an offset into an object specified by a .NET token"""

def __init__(self, token: Token, offset: int):
assert offset >= 0
self.token = token
self.offset = offset

def __eq__(self, other):
return (self.token.value, self.offset) == (other.token.value, other.offset)

def __lt__(self, other):
return (self.token.value, self.offset) < (other.token.value, other.offset)

def __hash__(self):
return hash((self.token.value, self.offset))

def __repr__(self):
return f"token(0x{self.token.value:x})+(0x{self.offset:x})"


class _NoAddress(Address):
def __eq__(self, other):
return True

def __lt__(self, other):
return False

def __hash__(self):
return hash(0)

def __repr__(self):
return "no address"


NO_ADDRESS = _NoAddress()
11 changes: 2 additions & 9 deletions capa/features/basicblock.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,11 @@


class BasicBlock(Feature):
def __init__(self):
super(BasicBlock, self).__init__(None)
def __init__(self, description=None):
super(BasicBlock, self).__init__(None, description=description)

def __str__(self):
return "basic block"

def get_value_str(self):
return ""

def freeze_serialize(self):
return (self.__class__.__name__, [])

@classmethod
def freeze_deserialize(cls, args):
return cls()
69 changes: 24 additions & 45 deletions capa/features/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import codecs
import logging
import collections
from typing import TYPE_CHECKING, Set, Dict, List, Union
from typing import TYPE_CHECKING, Set, Dict, List, Union, Optional, Sequence

if TYPE_CHECKING:
# circular import, otherwise
Expand All @@ -20,6 +20,7 @@
import capa.perf
import capa.features
import capa.features.extractors.elf
from capa.features.address import Address

logger = logging.getLogger(__name__)
MAX_BYTES_FEATURE_SIZE = 0x100
Expand Down Expand Up @@ -70,20 +71,13 @@ def __init__(
success: bool,
statement: Union["capa.engine.Statement", "Feature"],
children: List["Result"],
locations=None,
locations: Optional[Set[Address]] = None,
):
"""
args:
success (bool)
statement (capa.engine.Statement or capa.features.Feature)
children (list[Result])
locations (iterable[VA])
"""
super(Result, self).__init__()
self.success = success
self.statement = statement
self.children = children
self.locations = locations if locations is not None else ()
self.locations = locations if locations is not None else set()

def __eq__(self, other):
if isinstance(other, bool):
Expand All @@ -98,7 +92,7 @@ def __nonzero__(self):


class Feature(abc.ABC):
def __init__(self, value: Union[str, int, bytes], description=None):
def __init__(self, value: Union[str, int, float, bytes], description=None):
"""
Args:
value (any): the value of the feature, such as the number or string.
Expand All @@ -116,6 +110,15 @@ def __hash__(self):
def __eq__(self, other):
return self.name == other.name and self.value == other.value

def __lt__(self, other):
# TODO: this is a huge hack!
import capa.features.freeze.features

return (
capa.features.freeze.features.feature_from_capa(self).json()
< capa.features.freeze.features.feature_from_capa(other).json()
)

def get_value_str(self) -> str:
"""
render the value of this feature, for use by `__str__` and friends.
Expand All @@ -137,27 +140,10 @@ def __str__(self):
def __repr__(self):
return str(self)

def evaluate(self, ctx: Dict["Feature", Set[int]], **kwargs) -> Result:
def evaluate(self, ctx: Dict["Feature", Set[Address]], **kwargs) -> Result:
capa.perf.counters["evaluate.feature"] += 1
capa.perf.counters["evaluate.feature." + self.name] += 1
return Result(self in ctx, self, [], locations=ctx.get(self, []))

def freeze_serialize(self):
return (self.__class__.__name__, [self.value])

@classmethod
def freeze_deserialize(cls, args):
# as you can see below in code,
# if the last argument is a dictionary,
# consider it to be kwargs passed to the feature constructor.
if len(args) == 1:
return cls(*args)
elif isinstance(args[-1], dict):
kwargs = args[-1]
args = args[:-1]
return cls(*args, **kwargs)
else:
return cls(*args)
return Result(self in ctx, self, [], locations=ctx.get(self, set()))


class MatchedRule(Feature):
Expand Down Expand Up @@ -230,7 +216,7 @@ def evaluate(self, ctx, short_circuit=True):
# instead, return a new instance that has a reference to both the substring and the matched values.
return Result(True, _MatchedSubstring(self, matches), [], locations=locations)
else:
return Result(False, _MatchedSubstring(self, None), [])
return Result(False, _MatchedSubstring(self, {}), [])

def __str__(self):
return "substring(%s)" % self.value
Expand All @@ -244,11 +230,11 @@ class _MatchedSubstring(Substring):
note: this type should only ever be constructed by `Substring.evaluate()`. it is not part of the public API.
"""

def __init__(self, substring: Substring, matches):
def __init__(self, substring: Substring, matches: Dict[str, Set[Address]]):
"""
args:
substring (Substring): the substring feature that matches.
match (Dict[string, List[int]]|None): mapping from matching string to its locations.
substring: the substring feature that matches.
match: mapping from matching string to its locations.
"""
super(_MatchedSubstring, self).__init__(str(substring.value), description=substring.description)
# we want this to collide with the name of `Substring` above,
Expand Down Expand Up @@ -327,7 +313,7 @@ def evaluate(self, ctx, short_circuit=True):
# see #262.
return Result(True, _MatchedRegex(self, matches), [], locations=locations)
else:
return Result(False, _MatchedRegex(self, None), [])
return Result(False, _MatchedRegex(self, {}), [])

def __str__(self):
return "regex(string =~ %s)" % self.value
Expand All @@ -341,11 +327,11 @@ class _MatchedRegex(Regex):
note: this type should only ever be constructed by `Regex.evaluate()`. it is not part of the public API.
"""

def __init__(self, regex: Regex, matches):
def __init__(self, regex: Regex, matches: Dict[str, Set[Address]]):
"""
args:
regex (Regex): the regex feature that matches.
match (Dict[string, List[int]]|None): mapping from matching string to its locations.
regex: the regex feature that matches.
matches: mapping from matching string to its locations.
"""
super(_MatchedRegex, self).__init__(str(regex.value), description=regex.description)
# we want this to collide with the name of `Regex` above,
Expand Down Expand Up @@ -389,13 +375,6 @@ def evaluate(self, ctx, **kwargs):
def get_value_str(self):
return hex_string(bytes_to_str(self.value))

def freeze_serialize(self):
return (self.__class__.__name__, [bytes_to_str(self.value).upper()])

@classmethod
def freeze_deserialize(cls, args):
return cls(*[codecs.decode(x, "hex") for x in args])


# other candidates here: https://docs.microsoft.com/en-us/windows/win32/debug/pe-format#machine-types
ARCH_I386 = "i386"
Expand Down
Loading

0 comments on commit fb99ef5

Please sign in to comment.