Skip to content

Commit

Permalink
Generator basic block control flow-graphs for IL (#469)
Browse files Browse the repository at this point in the history
* update versions

* Start a CIL flow-graph generator

* CFG generator

* Update release notes
  • Loading branch information
tonybaloney authored Jan 1, 2022
1 parent a650888 commit aba275f
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 42 deletions.
8 changes: 4 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

## 1.2.2

* enable ASAN for Windows
* Map short branch equivalent and patch out 3 bytes to a nop
* Fix offset calculation
* Create a simple loop test and check for 1 byte signed size
* Added `pyjion.dis.flow_graph()` function to get a DOT control flow-graph of CIL basic blocks
* Added `pyjion.dis.cil_instructions()` function to get a list of CIL instructions from a compiled function
* Enable ASAN for Windows (compile-time option)
* CIL compiles to short branch opcodes when target is within 1 byte
* Show effective branch target on CIL disassembly output

## 1.2.1
Expand Down
12 changes: 11 additions & 1 deletion Tests/test_dis.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from pyjion.dis import print_il, dis, dis_native
from pyjion.dis import print_il, dis, dis_native, flow_graph
import pyjion
import sys
import pytest
Expand Down Expand Up @@ -27,6 +27,16 @@ def test_f():
assert "ldarg.1" in captured.out


def test_flow_graph():
def test_f():
numbers = (1, 2, 3, 4)
return sum(numbers)

assert test_f() == 10
graph = flow_graph(test_f)
assert "digraph" in graph


@pytest.mark.graph
@pytest.mark.nopgc # TODO : Resolve PGC error in dis module.
def test_dis_with_offsets(capsys):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

setup(
name='pyjion',
version='1.2.1',
version='1.2.2',
description='A JIT compiler wrapper for CPython',
author='Anthony Shaw',
author_email='anthonyshaw@apache.org',
Expand Down
2 changes: 1 addition & 1 deletion src/pyjion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from enum import IntFlag, IntEnum
from dataclasses import dataclass

__version__ = '1.2.1'
__version__ = '1.2.2'


def _no_dotnet(path):
Expand Down
175 changes: 140 additions & 35 deletions src/pyjion/dis.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from dis import get_instructions
from typing import Any, Dict, List, Optional, Set
from pyjion import il, native, offsets as get_offsets, symbols
from collections import namedtuple
from warnings import warn
import struct
from platform import machine
import dataclasses


__all__ = [
"dis",
Expand Down Expand Up @@ -369,7 +372,23 @@
OPDEF("CEE_UNUSED70", "unused", Pop0, Push0, InlineNone, IPrimitive, 2, 0xFE, 0x22, NEXT),
]

opcode_map = {}
@dataclasses.dataclass
class CILInstruction:
offset: int
opcode: OPDEF
argument: Optional[Any]
jump_offset: Optional[int]

def __str__(self):
if self.jump_offset:
return f"{self.opcode.name} {self.argument} (IL_{self.jump_offset:04x})"
if self.argument:
return f"{self.opcode.name} {self.argument}"
return f"{self.opcode.name}"



opcode_map: Dict[int, OPDEF] = {}
for opcode in opcodes:
if opcode.first_byte == 0xFF:
# single byte opcode
Expand All @@ -378,109 +397,93 @@
opcode_map[opcode.first_byte + opcode.second_byte] = opcode


def print_il(il: bytearray, symbols, offsets=None, bytecodes=None, print_pc=True) -> None:
"""
Print the CIL sequence
:param il: A bytearray of ECMA 335 CIL
:param offsets: A dictionary of Python bytecode offsets
:param bytecodes: The dictionary of Python bytecode instructions
:param print_pc: Flag to include the PC offsets in the print
"""
def cil_instructions(il, symbols) -> List[CILInstruction]:
i = iter(il)
instructions: List[CILInstruction] = []
try:
pc = 0
while True:
# See if this is the offset of a matching Python instruction
if offsets and bytecodes:
for py_offset, il_offset, native_offset, offset_type in offsets:
if il_offset == pc and offset_type == 'instruction':
try:
instruction = bytecodes[py_offset]
print(f'// {instruction.offset} {instruction.opname} - {instruction.arg} ({instruction.argval})', )
except KeyError:
warn("Invalid offset {0}".format(offsets))
first = next(i)
if first == 0 and pc == 0:
raise NotImplementedError(f"CorILMethod_FatFormat not yet supported")

op = opcode_map[first]
pc_label = f"IL_{pc:04x}: " if print_pc else ""
if op.size == InlineNone:
print(f"{pc_label}{op.name}")
if op.cee_code != "CEE_NOP":
instructions.append(CILInstruction(pc, op, None, None))
pc += 1
continue
elif op.size == ShortInlineBrTarget:
target = int.from_bytes((next(i),), byteorder='little', signed=True)
effective_target = (pc + 2) + target # What is the actual destination address
print(f"{pc_label}{op.name} {target} (IL_{effective_target:04x})")
instructions.append(CILInstruction(pc, op, target, effective_target))
pc += 2
continue
elif op.size == ShortInlineVar:
target = int.from_bytes((next(i),), byteorder='little', signed=True)
print(f"{pc_label}{op.name} {target}")
instructions.append(CILInstruction(pc, op, target, None))
pc += 2
continue
elif op.size == ShortInlineI:
target = int.from_bytes((next(i),), byteorder='little', signed=True)
print(f"{pc_label}{op.name} {target}")
instructions.append(CILInstruction(pc, op, target, None))
pc += 2
continue
elif op.size == ShortInlineR:
target = int.from_bytes((next(i), next(i), next(i), next(i)), byteorder='little', signed=True)
print(f"{pc_label}{op.name} {target}")
instructions.append(CILInstruction(pc, op, target, None))
pc += 5
continue
elif op.size == InlineBrTarget:
target = int.from_bytes((next(i), next(i), next(i), next(i)), byteorder='little', signed=True)
effective_target = (pc + 5) + target # What is the actual destination address
print(f"{pc_label}{op.name} {target} (IL_{effective_target:04x})")
instructions.append(CILInstruction(pc, op, target, effective_target))
pc += 5
continue
elif op.size == InlineField:
field = int.from_bytes((next(i), next(i), next(i), next(i)), byteorder='little', signed=True)
print(f"{pc_label}{op.name} {field}")
instructions.append(CILInstruction(pc, op, field, None))
pc += 5
continue
elif op.size == InlineR:
[target] = struct.unpack('f', bytes((next(i), next(i), next(i), next(i))))
print(f"{pc_label}{op.name} {target}")
instructions.append(CILInstruction(pc, op, target, None))
pc += 5
continue
elif op.size == InlineI:
target = int.from_bytes((next(i), next(i), next(i), next(i)), byteorder='little', signed=True)
print(f"{pc_label}{op.name} {target}")
instructions.append(CILInstruction(pc, op, target, None))
pc += 5
continue
elif op.size == InlineI8:
target = int.from_bytes((next(i), next(i), next(i), next(i), next(i), next(i), next(i), next(i)), byteorder='little', signed=True)
print(f"{pc_label}{op.name} {target}")
instructions.append(CILInstruction(pc, op, target, None))
pc += 9
continue
elif op.size == InlineMethod:
target = int.from_bytes((next(i), next(i), next(i), next(i)), byteorder='little', signed=True)
meth = symbols.get(target, target)
print(f"{pc_label}{op.name} {meth}")
instructions.append(CILInstruction(pc, op, meth, None))
pc += 5
continue
elif op.size == InlineSig:
target = int.from_bytes((next(i), next(i), next(i), next(i)), byteorder='little', signed=True)
print(f"{pc_label}{op.name} {target}")
instructions.append(CILInstruction(pc, op, target, None))
pc += 5
continue
elif op.size == InlineTok:
target = int.from_bytes((next(i), next(i), next(i), next(i)), byteorder='little', signed=True)
print(f"{pc_label}{op.name} {target}")
instructions.append(CILInstruction(pc, op, target, None))
pc += 5
continue
elif op.size == InlineString:
target = bytearray((next(i), next(i), next(i), next(i))).decode('utf-8')
print(f"{pc_label}{op.name} {target}")
instructions.append(CILInstruction(pc, op, target, None))
pc += 5
continue
elif op.size == InlineVar:
target = int.from_bytes((next(i), next(i)), byteorder='little', signed=True)
print(f"{pc_label}{op.name} {target}")
instructions.append(CILInstruction(pc, op, target, None))
pc += 3
continue
else:
Expand All @@ -489,6 +492,108 @@ def print_il(il: bytearray, symbols, offsets=None, bytecodes=None, print_pc=True
except StopIteration:
pass

return instructions


def print_il(il: bytearray, symbols, offsets=None, bytecodes=None, print_pc=True) -> None:
"""
Print the CIL sequence
:param il: A bytearray of ECMA 335 CIL
:param offsets: A dictionary of Python bytecode offsets
:param bytecodes: The dictionary of Python bytecode instructions
:param print_pc: Flag to include the PC offsets in the print
"""
instructions = cil_instructions(il, symbols)
for instruction in instructions:
# See if this is the offset of a matching Python instruction
if offsets and bytecodes:
for py_offset, il_offset, _, offset_type in offsets:
if il_offset == instruction.offset and offset_type == 'instruction':
try:
python_instruction = bytecodes[py_offset]
print(f'// {python_instruction.offset} {python_instruction.opname} - {python_instruction.arg} ({python_instruction.argval})', )
except KeyError:
warn("Invalid offset {0}".format(offsets))

pc_label = f"IL_{instruction.offset:04x}: " if print_pc else ""
print(f"{pc_label}{instruction}")


def flow_graph(f):
"""
Return a control flow-graph in DOT syntax for the CIL instructions for f
:param f: The compiled function or code object
:returns: The Graph in DOT format
:rtype: ``str``
"""
_il = il(f)
result = ""
if not _il:
print("No IL for this function, it may not have compiled correctly.")
return
instructions = cil_instructions(_il, symbols(f))
result += """
digraph g {
graph [
rankdir = "LR"
];
node [
fontsize = "16"
shape = "ellipse"
];
edge [
];\n
"""
block_starts: Set[int] = {0}
block_jumps = [] # list of tuples (from, to)
jump_to_block = {}

# Compile a list of basic block starts
for idx, instruction in enumerate(instructions):
if instruction.jump_offset:
block_starts.add(instruction.jump_offset)
block_jumps.append((instruction.offset, instruction.jump_offset))
if instruction.opcode.cee_code not in ["CEE_BR", "CEE_BR_S"]:
block_starts.add(instructions[idx+1].offset)
block_jumps.append((instruction.offset, instructions[idx+1].offset))

in_block = False
cur_block = None
labels = []
for idx, instruction in enumerate(instructions):
if instruction.offset in block_starts:
if in_block:
result += "label = \"" + ' | '.join(labels) + "\"\n"
labels.clear()
result += 'shape = "record"\n];\n'
# Add fall-through jumps
if instructions[idx-1].opcode.size not in [InlineBrTarget, ShortInlineBrTarget]:
if (instructions[idx-1].offset, instruction.offset) not in block_jumps:
block_jumps.append((instructions[idx-1].offset, instruction.offset))
jump_to_block[instructions[idx-1].offset] = cur_block

result += f'"block_{instruction.offset:04x}" [\n'
in_block = True
cur_block = f"block_{instruction.offset:04x}"
if instruction.jump_offset:
jump_to_block[instruction.offset] = cur_block

labels.append(f"<IL{instruction.offset:04x}> {instruction.offset:04x} : {instruction}")

if in_block:
result += "label = \"" + ' | '.join(labels) + "\"\n"
labels.clear()
result += 'shape = "record"\n];\n'

for from_, to in block_jumps:
resolved_block = jump_to_block[from_]
result += f'{resolved_block}:IL{from_:04x} -> "block_{to:04x}":IL{to:04x};\n'

result += "\n}\n"
return result


def dis(f, include_offsets=False, print_pc=True):
"""
Expand Down

0 comments on commit aba275f

Please sign in to comment.