Reducing duplicate/redundant test cases? #622
-
I'm just starting off with BooFuzz and am trying to ensure that I'm not creating extraneous test cases since our target may be heavily rate-limited (1-10 requests/sec). As part of my sandbox testing, I've created a very simple fuzzing script to verify that I've got the basics working. class PackedChunk(FuzzableBlock):
"""Bit-packed chunk to ensure BitField instances aren't extended to full byte lengths
"""
def get_child_data(self, mutation_context):
"""Overrides reference data for this node.
Will perform bit-packing for any ``BitField`` children. All else are returned as-is.
Args:
mutation_context (MutationContext): Mutation context.
Returns:
bytes: Child data.
"""
packed = 0
bitlength = 0
for item in self.stack:
if isinstance(item, BitField):
packed = (packed << item.width) | int.from_bytes(
item.render(mutation_context=mutation_context), "big"
)
bitlength += item.width
else:
child = item.render(mutation_context=mutation_context)
packed = (packed << (len(child) * 8)) | int.from_bytes(child, "big")
bitlength += len(child) * 8
return int.to_bytes(packed, 2, "big")
# . . .
def test_packed_chunk(ip="127.0.0.1", port=8008, depth=-1, logging=False):
pc = PackedChunk(
"chunk1",
children=(
BitField(name="FlagGroup_1", width=3, full_range=True),
Byte(name="Int8_1", default_value=b"\x00"),
BitField(name="FlagGroup_2", width=5, full_range=True),
),
)
r = Request("tx", children=(pc,))
c = TCPSocketConnection(ip, port)
t = Target(connection=c)
s = Session(target=t, keep_web_open=False, fuzz_loggers=None if logging else [])
s.connect(r)
try:
s.fuzz(max_depth=depth if depth != -1 else ((len(pc.stack) - 1) or 1))
except KeyboardInterrupt:
pass
finally:
IOLoop.current().stop() # Kill web server at exit Executing the above with a depth of 3 yields 223,698 test cases. I expected up to 65,536 since there are only two bytes of data being fuzzed. So I dug into the test cases and noticed that a lot were duplicated but likely considered valid/unique given their ordering:
As we only have three |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 2 replies
-
Alright, so I've partially answered my own question. It's very rudimentary and needs optimization/tuning, but it can work as a proof of concept for what I need in my particular use case should someone believe it to be desirable for others as well. Duplicate test cases are a result of Lines 1484 to 1494 in 0cfcef5 The recursive functionality means that all possible combination will be checked and exhausted to determine valid test cases. This is probably necessary as the recursion works as a state of tracking without logging every path visited, plus current employment of generators ensures low memory consumption. So I've opted to not change much to this. My approach added two variables to
Specifying a minimum depth is only useful when fuzzing a full range of test cases. For example, in my question I demonstrated having only three primitives/fuzzables, each with a default value of 0. Default behavior of BooFuzz would start at a depth of 1 and increment when all combinations per depth are found. So a combinatorial depth of 1 where only Tracking only unique test cases poses a bit of a problem as we now have to include a method for determining what cases/paths we've already visited. We also need to detect when scrambled ordering matches our current state. For that, a Alright, enough babbling. Here's the custom Class that implements the changes: from boofuzz import Session
from boofuzz.mutation_context import MutationContext
class CustomSession(Session):
"""Overrides some functions of Session in an attempt to reduce duplicate named test cases"""
def fuzz(self, name=None, max_depth=None, min_depth=None, unique_only=False):
"""Fuzz the entire protocol tree.
Iterates through and fuzzes all fuzz cases, skipping according to
self.skip and restarting based on self.restart_interval.
If you want the web server to be available, your program must persist
after calling this method. helpers.pause_for_signal() is
available to this end.
Args:
name (str): Pass in a Request name to fuzz only a single request message. Pass in a test case name to fuzz
only a single test case.
max_depth (int): Maximum combinatorial depth; set to 1 for "simple" fuzzing.
min_depth (int): Minimum combinatorial depth
unique_only (bool): Do not execute test cases with the same mutation values as ones from before
Returns:
None
"""
self.total_mutant_index = 0
self.total_num_mutations = self.num_mutations(max_depth=max_depth)
if name is None or name == "":
self._main_fuzz_loop(
self._generate_mutations_indefinitely(
max_depth=max_depth, min_depth=min_depth, unique_only=unique_only
)
)
else:
self.fuzz_by_name(name=name)
def _generate_mutations_indefinitely(
self, max_depth=None, path=None, min_depth=None, unique_only=False
):
"""Yield MutationContext with n mutations per message over all messages, with n increasing indefinitely."""
depth = min_depth if min_depth is not None and min_depth > 0 else 1
while max_depth is None or depth <= max_depth:
valid_case_found_at_this_depth = False
for m in self._generate_n_mutations(
depth=depth, path=path, unique_only=unique_only
):
valid_case_found_at_this_depth = True
yield m
if not valid_case_found_at_this_depth:
break
depth += 1
def _generate_n_mutations(self, depth, path, unique_only=False):
"""Yield MutationContext with n mutations per message over all messages."""
for path in self._iterate_protocol_message_paths(path=path):
for m in self._generate_n_mutations_for_path(
path, depth=depth, unique_only=unique_only
):
yield m
def _generate_n_mutations_for_path(self, path, depth, unique_only=False):
"""Yield MutationContext with n mutations for a specific message.
Args:
path (list of Connection): Nodes (Requests) along the path to the current one being fuzzed.
depth (int): Yield sets of depth mutations.
Yields:
MutationContext: A MutationContext containing one mutation.
"""
if unique_only:
visited = set()
for mutations in self._generate_n_mutations_for_path_recursive(
path, depth=depth
):
if not self._mutations_contain_duplicate(mutations):
if unique_only:
testcase = frozenset(
f"{m.qualified_name}:{m.index}" for m in mutations
)
if testcase in visited:
continue
else:
visited.add(testcase)
self.total_mutant_index += 1
yield MutationContext(
message_path=path,
mutations={n.qualified_name: n for n in mutations},
)
def _mutations_contain_duplicate(self, mutations):
names = [m.qualified_name for m in mutations]
cases = set(names)
return not (len(names) == len(cases)) Generating all test cases takes roughly the same amount of time (using ipython magic function In terms of runtime performance, I have a test setup with a TCP Echo server as the target and console logging disabled while running on an i7-10810U (6C/12T): from argparse import ArgumentParser
from enum import unique
from datetime import datetime
from multiprocessing import Process
from socket import socket, AF_INET, SOCK_STREAM
from tornado.ioloop import IOLoop
from packing.messages import PackedChunk
from packing.sessions import CustomSession
def run_echo_server(ext_ip="127.0.0.1", port=8008):
sock = socket(AF_INET, SOCK_STREAM)
sock.bind((ext_ip, port))
sock.listen(1)
while True:
connection, client_address = sock.accept()
try:
while True:
data = connection.recv(16)
if data:
connection.sendall(data)
else:
break
connection.close()
except (ConnectionResetError, ConnectionAbortedError):
pass
finally:
try:
connection.close()
except:
pass
def test_packed_chunk(
ip="127.0.0.1",
port=8008,
max_depth=None,
min_depth=None,
logging=False,
unique_only=False,
):
p = Process(target=run_echo_server, args=(ip, port))
p.start()
b = PackedChunk(
"chunk1",
children=(
BitField(name="FlagGroup_1", width=3, full_range=True),
Byte(name="Int8_1", default_value=b"\x00", full_range=True),
BitField(name="FlagGroup_2", width=5, full_range=True),
),
)
r = Request("tx", children=(b,))
c = TCPSocketConnection(ip, port)
t = Target(connection=c)
s = CustomSession(
target=t,
keep_web_open=False,
fuzz_loggers=None if logging else [],
db_filename=f"./boofuzz-results/session-maxdepth-{max_depth}-mindepth-{min_depth}{'-unique-' if unique_only else ''}_{datetime.utcnow().replace(microsecond=0).isoformat().replace(':', '-')}.db",
)
s.connect(r)
try:
s.fuzz(max_depth=max_depth, min_depth=min_depth, unique_only=unique_only)
except KeyboardInterrupt:
pass
finally:
p.terminate()
IOLoop.current().stop()
if __name__ == "__main__":
agp = ArgumentParser()
agp.add_argument("-i", "--ip", default="127.0.0.1")
agp.add_argument("-p", "--port", default=8008, type=int)
agp.add_argument("--max-depth", type=int)
agp.add_argument("--min-depth", type=int)
agp.add_argument("-u", "--unique-only", action="store_true")
agp.add_argument(
"-l", "--logging", action="store_true", help="Enable console logging"
)
arg = agp.parse_args()
test_packed_chunk(
ip=arg.ip,
port=arg.port,
max_depth=arg.max_depth,
min_depth=arg.min_depth,
logging=arg.logging,
unique_only=arg.unique_only,
)
So this helps shave off an hour of runtime, even before we begin to add pre- and post-testcase callbacks. It's a good enough solution for the time being. |
Beta Was this translation helpful? Give feedback.
Alright, so I've partially answered my own question. It's very rudimentary and needs optimization/tuning, but it can work as a proof of concept for what I need in my particular use case should someone believe it to be desirable for others as well.
Duplicate test cases are a result of
Session._generate_n_mutations_for_path_recursive()
boofuzz/boofuzz/sessions.py
Lines 1484 to 1494 in 0cfcef5