Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Property-based tests with Hypothesis #79

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ build
dist
*.egg-info/
doc/_build
.hypothesis/
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ python:
- pypy3
install:
- travis_retry pip install -r requirements.txt
- travis_retry pip install -r requirements-dev.txt
- travis_retry pip install coveralls
script:
- coverage run --source=jsonpointer tests.py
Expand All @@ -22,8 +23,6 @@ addons:
apt:
packages:
- pandoc
before_deploy:
- pip install -r requirements-dev.txt
deploy:
provider: pypi
user: skoegl
Expand Down
209 changes: 86 additions & 123 deletions jsonpatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,7 @@ def from_diff(cls, src, dst, optimization=True):
True
"""

builder = DiffBuilder()
builder._compare_values('', None, src, dst)
builder = DiffBuilder(src, dst)
ops = list(builder.execute())
return cls(ops)

Expand Down Expand Up @@ -625,134 +624,75 @@ def apply(self, obj):

class DiffBuilder(object):

def __init__(self):
self.index_storage = [{}, {}]
self.index_storage2 = [[], []]
self.__root = root = []
root[:] = [root, root, None]
def __init__(self, src, dst):
self.src = src
self.dst = dst

def store_index(self, value, index, st):
try:
storage = self.index_storage[st]
stored = storage.get(value)
if stored is None:
storage[value] = [index]
else:
storage[value].append(index)
self.ops = []
self.index_changes = {}

except TypeError:
self.index_storage2[st].append((value, index))
def insert(self, op):
self.ops.append(op)

def take_index(self, value, st):
try:
stored = self.index_storage[st].get(value)
if stored:
return stored.pop()
def execute(self):
self._compare_values('', None, self.src, self.dst)
return [op.operation for op in self.ops]

except TypeError:
storage = self.index_storage2[st]
for i in range(len(storage)-1, -1, -1):
if storage[i][0] == value:
return storage.pop(i)[1]
def _item_added(self, path, key, item, is_sequence):
if is_sequence:
key = self.get_index_change(path, key)

def insert(self, op):
root = self.__root
last = root[0]
last[1] = root[0] = [last, root, op]
return root[0]

def remove(self, index):
link_prev, link_next, _ = index
link_prev[1] = link_next
link_next[0] = link_prev
index[:] = []

def iter_from(self, start):
root = self.__root
curr = start[1]
while curr is not root:
yield curr[2]
curr = curr[1]
new_op = AddOperation({
'op': 'add',
'path': _path_join(path, key),
'value': item,
})
self.insert(new_op)

def __iter__(self):
root = self.__root
curr = root[1]
while curr is not root:
yield curr[2]
curr = curr[1]
if is_sequence:
self.add_index_change(path, key, +1)

def execute(self):
root = self.__root
curr = root[1]
while curr is not root:
if curr[1] is not root:
op_first, op_second = curr[2], curr[1][2]
if op_first.location == op_second.location and \
type(op_first) == RemoveOperation and \
type(op_second) == AddOperation:
yield ReplaceOperation({
'op': 'replace',
'path': op_second.location,
'value': op_second.operation['value'],
}).operation
curr = curr[1][1]
continue
def _item_moved(self, path, key, item, newkey):
""" The `item` was moved from `key` to `newkey`

[a, b, c, d, e]
0 1, 2, 3, 4

case 1: move 1 to 3
[a, c, d, b, e]
0 1, 2, 3, 4

"""

key = self.get_index_change(path, key)
newkey = self.get_index_change(path, newkey)

if key == newkey:
return

new_op = MoveOperation({
'op': 'move',
'from': _path_join(path, key),
'path': _path_join(path, newkey),
})
self.insert(new_op)

self.add_index_change(path, key+1, -1)
self.add_index_change(path, newkey+1, +1)


def _item_removed(self, path, key, item, is_sequence):
if is_sequence:
key = self.get_index_change(path, key)

yield curr[2].operation
curr = curr[1]

def _item_added(self, path, key, item):
index = self.take_index(item, _ST_REMOVE)
if index is not None:
op = index[2]
if type(op.key) == int:
for v in self.iter_from(index):
op.key = v._on_undo_remove(op.path, op.key)

self.remove(index)
if op.location != _path_join(path, key):
new_op = MoveOperation({
'op': 'move',
'from': op.location,
'path': _path_join(path, key),
})
self.insert(new_op)
else:
new_op = AddOperation({
'op': 'add',
'path': _path_join(path, key),
'value': item,
})
new_index = self.insert(new_op)
self.store_index(item, new_index, _ST_ADD)

def _item_removed(self, path, key, item):
new_op = RemoveOperation({
'op': 'remove',
'path': _path_join(path, key),
})
index = self.take_index(item, _ST_ADD)
new_index = self.insert(new_op)
if index is not None:
op = index[2]
if type(op.key) == int:
for v in self.iter_from(index):
op.key = v._on_undo_add(op.path, op.key)

self.remove(index)
if new_op.location != op.location:
new_op = MoveOperation({
'op': 'move',
'from': new_op.location,
'path': op.location,
})
new_index[2] = new_op
self.insert(new_op)

else:
self.remove(new_index)

else:
self.store_index(item, new_index, _ST_REMOVE)
if is_sequence:
self.add_index_change(path, key, -1)

def _item_replaced(self, path, key, item):
self.insert(ReplaceOperation({
Expand All @@ -768,10 +708,10 @@ def _compare_dicts(self, path, src, dst):
removed_keys = src_keys - dst_keys

for key in removed_keys:
self._item_removed(path, str(key), src[key])
self._item_removed(path, str(key), src[key], False)

for key in added_keys:
self._item_added(path, str(key), dst[key])
self._item_added(path, str(key), dst[key], False)

for key in src_keys & dst_keys:
self._compare_values(path, key, src[key], dst[key])
Expand All @@ -783,6 +723,7 @@ def _compare_lists(self, path, src, dst):
for key in range(max_len):
if key < min_len:
old, new = src[key], dst[key]

if old == new:
continue

Expand All @@ -795,14 +736,14 @@ def _compare_lists(self, path, src, dst):
self._compare_lists(_path_join(path, key), old, new)

else:
self._item_removed(path, key, old)
self._item_added(path, key, new)
self._item_removed(path, key, old, True)
self._item_added(path, key, new, True)

elif len_src > len_dst:
self._item_removed(path, len_dst, src[key])
self._item_removed(path, len_dst, src[key], True)

else:
self._item_added(path, key, dst[key])
self._item_added(path, key, dst[key], True)

def _compare_values(self, path, key, src, dst):
if src == dst:
Expand All @@ -819,6 +760,28 @@ def _compare_values(self, path, key, src, dst):
else:
self._item_replaced(path, key, dst)

def add_index_change(self, path, key, change):
""" Store the index change of a certain path

[a, b, c]
0, 1, 2

When inserting d at position 1, the indexes of b and c change
[a, d, b, c]
0, 1, 2, 3

add_index_change('/', 1, +1)
"""

path_changes = self.index_changes.get(path, {})
key_change = path_changes.get(key, 0)
key_change = key_change + change
path_changes[key] = key_change

def get_index_change(self, path, key):
path_changes = self.index_changes.get(path, {})
return path_changes.get(key, 0) + key


def _path_join(path, key):
if key is None:
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
wheel
pypandoc
hypothesis>=3; python_version=="2.7" or python_version>"3.3"
38 changes: 38 additions & 0 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@
import jsonpatch
import jsonpointer
import sys
import string

try:
import hypothesis
from hypothesis import strategies as st
except ImportError:
hypothesis = None

try:
str = unicode # Python 2
except NameError:
pass


class ApplyPatchTestCase(unittest.TestCase):
Expand Down Expand Up @@ -608,6 +620,30 @@ def test_replace_missing(self):
self.assertRaises(jsonpatch.JsonPatchConflict, jsonpatch.apply_patch, src, patch_obj)


if hypothesis is not None:
json_st = st.recursive(
st.one_of([
st.none(),
st.booleans(),
st.floats(allow_nan=False),
st.text(string.printable)]),
lambda children:
st.lists(children)
| st.dictionaries(st.text(string.printable), children))

class RoundtripTests(unittest.TestCase):
@hypothesis.example({}, {str('%20'): None})
@hypothesis.example({str('%20'): None}, {})
@hypothesis.given(json_st, json_st)
def test_roundtrip(self, src, dst):
patch = jsonpatch.JsonPatch.from_diff(src, dst, False)
hypothesis.note('Patch: %s' % (patch,))
res = patch.apply(src)
message = '{src} + {patch} resulted in {res}; {dst} was expected'.format(
src=repr(src), patch=repr(patch), res=repr(res), dst=repr(dst))
self.assertEqual(res, dst, message)


if __name__ == '__main__':
modules = ['jsonpatch']

Expand All @@ -622,6 +658,8 @@ def get_suite():
suite.addTest(unittest.makeSuite(InvalidInputTests))
suite.addTest(unittest.makeSuite(ConflictTests))
suite.addTest(unittest.makeSuite(OptimizationTests))
if hypothesis is not None:
suite.addTest(unittest.makeSuite(RoundtripTests))
return suite


Expand Down