Skip to content

Commit

Permalink
tests/extmod: Add deflate.DeflateIO tests.
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
  • Loading branch information
jimmo authored and dpgeorge committed Jul 21, 2023
1 parent 3533924 commit 8b315ef
Show file tree
Hide file tree
Showing 6 changed files with 557 additions and 0 deletions.
148 changes: 148 additions & 0 deletions tests/extmod/deflate_compress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
try:
# Check if deflate is available.
import deflate
import io
except ImportError:
print("SKIP")
raise SystemExit

# Check if compression is enabled.
if not hasattr(deflate.DeflateIO, "write"):
print("SKIP")
raise SystemExit

# Simple compression & decompression.
b = io.BytesIO()
g = deflate.DeflateIO(b, deflate.RAW)
data = b"micropython"
N = 10
for i in range(N):
g.write(data)
g.close()
result_raw = b.getvalue()
print(len(result_raw) < len(data) * N)
b = io.BytesIO(result_raw)
g = deflate.DeflateIO(b, deflate.RAW)
print(g.read())

# Same, but using a context manager.
b = io.BytesIO()
with deflate.DeflateIO(b, deflate.RAW) as g:
for i in range(N):
g.write(data)
result_raw = b.getvalue()
print(len(result_raw) < len(data) * N)
b = io.BytesIO(result_raw)
with deflate.DeflateIO(b, deflate.RAW) as g:
print(g.read())

# Writing to a closed underlying stream.
b = io.BytesIO()
g = deflate.DeflateIO(b, deflate.RAW)
g.write(b"micropython")
b.close()
try:
g.write(b"micropython")
except ValueError:
print("ValueError")

# Writing to a closed DeflateIO.
b = io.BytesIO()
g = deflate.DeflateIO(b, deflate.RAW)
g.write(b"micropython")
g.close()
try:
g.write(b"micropython")
except OSError:
print("OSError")


def decompress(data, *args):
buf = io.BytesIO(data)
with deflate.DeflateIO(buf, *args) as g:
return g.read()


def compress(data, *args):
b = io.BytesIO()
with deflate.DeflateIO(b, *args) as g:
g.write(data)
return b.getvalue()


def compress_error(data, *args):
try:
compress(data, *args)
except OSError:
print("OSError")
except ValueError:
print("ValueError")


# More test patterns.
PATTERNS_RAW = (
(b"0", b"3\x00\x00"),
(b"a", b"K\x04\x00"),
(b"0" * 100, b"3\xa0\x03\x00\x00"),
(
bytes(range(64)),
b"c`dbfaec\xe7\xe0\xe4\xe2\xe6\xe1\xe5\xe3\x17\x10\x14\x12\x16\x11\x15\x13\x97\x90\x94\x92\x96\x91\x95\x93WPTRVQUS\xd7\xd0\xd4\xd2\xd6\xd1\xd5\xd370426153\xb7\xb0\xb4\xb2\xb6\xb1\xb5\xb3\x07\x00",
),
)
for unpacked, packed in PATTERNS_RAW:
print(compress(unpacked) == packed)
print(compress(unpacked, deflate.RAW) == packed)

# Verify header and checksum format.
unpacked = b"hello"
packed = b"\xcbH\xcd\xc9\xc9\x07\x00"


def check_header(n, a, b):
if a == b:
print(n)
else:
print(n, a, b)


check_header("RAW", compress(unpacked, deflate.RAW), packed)
check_header(
"ZLIB(9)", compress(unpacked, deflate.ZLIB, 9), b"\x18\x95" + packed + b"\x06,\x02\x15"
)
check_header(
"ZLIB(15)", compress(unpacked, deflate.ZLIB, 15), b"\x78\x9c" + packed + b"\x06,\x02\x15"
)
check_header(
"GZIP",
compress(unpacked, deflate.GZIP, 9),
b"\x1f\x8b\x08\x00\x00\x00\x00\x00\x04\x03" + packed + b"\x86\xa6\x106\x05\x00\x00\x00",
)

# Valid wbits values.
compress_error(unpacked, deflate.RAW, -1)
print(len(compress(unpacked, deflate.RAW, 0)))
compress_error(unpacked, deflate.RAW, 1)
compress_error(unpacked, deflate.RAW, 4)
for i in range(5, 16):
print(len(compress(unpacked, deflate.RAW, i)))
compress_error(unpacked, deflate.RAW, 16)

# Invalid values for format.
compress_error(unpacked, -1)
compress_error(unpacked, 5)

# Fill buf with a predictable pseudorandom sequence.
buf = bytearray(1024)
lfsr = 1 << 15 | 1
for i in range(len(buf)):
bit = (lfsr ^ (lfsr >> 1) ^ (lfsr >> 3) ^ (lfsr >> 12)) & 1
lfsr = (lfsr >> 1) | (bit << 15)
buf[i] = lfsr & 0xFF

# Verify that compression improves as the window size increases.
prev_len = len(buf)
for wbits in range(5, 10):
result = compress(buf, deflate.RAW, wbits)
next_len = len(result)
print(next_len < prev_len and decompress(result, deflate.RAW, wbits) == buf)
prev_len = next_len
41 changes: 41 additions & 0 deletions tests/extmod/deflate_compress.py.exp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
True
b'micropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropython'
True
b'micropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropythonmicropython'
ValueError
OSError
True
True
True
True
True
True
True
True
RAW
ZLIB(9)
ZLIB(15)
GZIP
ValueError
7
ValueError
ValueError
7
7
7
7
7
7
7
7
7
7
7
ValueError
ValueError
ValueError
False
True
True
True
True
174 changes: 174 additions & 0 deletions tests/extmod/deflate_decompress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
try:
# Check if deflate is available.
import deflate
import io
except ImportError:
print("SKIP")
raise SystemExit

# zlib.compress(b'micropython hello world hello world micropython', wbits=-9)
data_raw = b'\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcfS\xc8H\xcd\xc9\xc9W(\xcf/\xcaIAa\xe7"\xd4\x00\x00'
# zlib.compress(b'micropython hello world hello world micropython', wbits=9)
data_zlib = b'\x18\x95\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcfS\xc8H\xcd\xc9\xc9W(\xcf/\xcaIAa\xe7"\xd4\x00\x00\xbc\xfa\x12\x91'
# zlib.compress(b'micropython hello world hello world micropython', wbits=25)
data_gzip = b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcfS\xc8H\xcd\xc9\xc9W(\xcf/\xcaIAa\xe7"\xd4\x00\x00"\xeb\xc4\x98/\x00\x00\x00'

# compress(b'hello' + bytearray(300) + b'hello', format=deflate.RAW, 5)
data_wbits_5 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08\x88\x95\xcfH\xcd\xc9\xc9\x07\x00"
# compress(b'hello' + bytearray(300) + b'hello', format=deflate.RAW, 6)
data_wbits_6 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08\x88\xd5\x9f\x91\x9a\x93\x93\x0f\x00"
# compress(b'hello' + bytearray(300) + b'hello', format=deflate.RAW, 8)
data_wbits_8 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08\x88\xf5\x7fFjNN>\x00"
# compress(b'hello' + bytearray(2000) + b'hello', format=deflate.RAW, 10)
data_wbits_10 = b"\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08Fz\x18\x00\xc3`\xa4'\x03`2\x18\xe99\x01\x98\x13Fz\xfe\x07\xe6\xff\x91\x9e\xff\x81\xf9\x7f\xa4\xe7\x7f`\xfe\x1f\xba\xf9?#5''\x1f\x00"


def decompress(data, *args):
buf = io.BytesIO(data)
with deflate.DeflateIO(buf, *args) as g:
return g.read()


def decompress_error(data, *args):
try:
decompress(data, *args)
except OSError:
print("OSError")
except EOFError:
print("EOFError")
except ValueError:
print("ValueError")


# Basic handling of format and detection.
print(decompress(data_raw, deflate.RAW))
print(decompress(data_zlib, deflate.ZLIB))
print(decompress(data_gzip, deflate.GZIP))
print(decompress(data_zlib)) # detect zlib/gzip.
print(decompress(data_gzip)) # detect zlib/gzip.

decompress_error(data_raw) # cannot detect zlib/gzip from raw stream
decompress_error(data_raw, deflate.ZLIB)
decompress_error(data_raw, deflate.GZIP)
decompress_error(data_zlib, deflate.RAW)
decompress_error(data_zlib, deflate.GZIP)
decompress_error(data_gzip, deflate.RAW)
decompress_error(data_gzip, deflate.ZLIB)

# Invalid data stream.
decompress_error(b"abcef", deflate.RAW)

# Invalid block type. final-block, block-type=3.
decompress_error(b"\x07", deflate.RAW)

# Truncated stream.
decompress_error(data_raw[:10], deflate.RAW)

# Partial reads.
buf = io.BytesIO(data_zlib)
with deflate.DeflateIO(buf) as g:
print(buf.seek(0, 1)) # verify stream is not read until first read of the DeflateIO stream.
print(g.read(1))
print(buf.seek(0, 1)) # verify that only the minimal amount is read from the source
print(g.read(1))
print(buf.seek(0, 1))
print(g.read(2))
print(buf.seek(0, 1))
print(g.read())
print(buf.seek(0, 1))
print(g.read(1))
print(buf.seek(0, 1))
print(g.read())

# Invalid zlib checksum (+ length for gzip). Note: only checksum errors are
# currently detected, see the end of uzlib_uncompress_chksum().
decompress_error(data_zlib[:-4] + b"\x00\x00\x00\x00")
decompress_error(data_gzip[:-8] + b"\x00\x00\x00\x00\x00\x00\x00\x00")
decompress_error(data_zlib[:-4] + b"\x00\x00\x00\x00", deflate.ZLIB)
decompress_error(data_gzip[:-8] + b"\x00\x00\x00\x00\x00\x00\x00\x00", deflate.GZIP)

# Reading from a closed underlying stream.
b = io.BytesIO(data_raw)
g = deflate.DeflateIO(b, deflate.RAW)
g.read(4)
b.close()
try:
g.read(4)
except ValueError:
print("ValueError")

# Reading from a closed DeflateIO.
b = io.BytesIO(data_raw)
g = deflate.DeflateIO(b, deflate.RAW)
g.read(4)
g.close()
try:
g.read(4)
except OSError:
print("OSError")

# Gzip header with extra flags (FCOMMENT FNAME FEXTRA FHCRC) enabled.
data_gzip_header_extra = b"\x1f\x8b\x08\x1e}\x9a\x9bd\x02\x00\x00\x00\x00\x00\x00\xff\xcb\xcdL.\xca/\xa8,\xc9\xc8\xcf\x03\x00\xf2KF>\x0b\x00\x00\x00"
print(decompress(data_gzip_header_extra))

# Test patterns.
PATTERNS_ZLIB = [
# Packed results produced by CPy's zlib.compress()
(b"0", b"x\x9c3\x00\x00\x001\x001"),
(b"a", b"x\x9cK\x04\x00\x00b\x00b"),
(b"0" * 100, b"x\x9c30\xa0=\x00\x00\xb3q\x12\xc1"),
(
bytes(range(64)),
b"x\x9cc`dbfaec\xe7\xe0\xe4\xe2\xe6\xe1\xe5\xe3\x17\x10\x14\x12\x16\x11\x15\x13\x97\x90\x94\x92\x96\x91\x95\x93WPTRVQUS\xd7\xd0\xd4\xd2\xd6\xd1\xd5\xd370426153\xb7\xb0\xb4\xb2\xb6\xb1\xb5\xb3\x07\x00\xaa\xe0\x07\xe1",
),
(b"hello", b"x\x01\x01\x05\x00\xfa\xffhello\x06,\x02\x15"), # compression level 0
# adaptive/dynamic huffman tree
(
b"13371813150|13764518736|12345678901",
b"x\x9c\x05\xc1\x81\x01\x000\x04\x04\xb1\x95\\\x1f\xcfn\x86o\x82d\x06Qq\xc8\x9d\xc5X}<e\xb5g\x83\x0f\x89X\x07\xab",
),
# dynamic Huffman tree with "case 17" (repeat code for 3-10 times)
(
b">I}\x00\x951D>I}\x00\x951D>I}\x00\x951D>I}\x00\x951D",
b"x\x9c\x05\xc11\x01\x00\x00\x00\x010\x95\x14py\x84\x12C_\x9bR\x8cV\x8a\xd1J1Z)F\x1fw`\x089",
),
]
for unpacked, packed in PATTERNS_ZLIB:
print(decompress(packed) == unpacked)
print(decompress(packed, deflate.ZLIB) == unpacked)

# Older version's of CPython's zlib module still included the checksum and length (as if it were a zlib/gzip stream).
# Make sure there're no problem decompressing this.
data_raw_with_footer = data_raw + b"\x00\x00\x00\x00\x00\x00\x00\x00"
print(decompress(data_raw_with_footer, deflate.RAW))

# Valid wbits values.
decompress_error(data_wbits_5, deflate.RAW, -1)
print(len(decompress(data_wbits_5, deflate.RAW, 0)))
decompress_error(data_wbits_5, deflate.RAW, 1)
decompress_error(data_wbits_5, deflate.RAW, 4)
for i in range(5, 16):
print(len(decompress(data_wbits_5, deflate.RAW, i)))
decompress_error(data_wbits_5, deflate.RAW, 16)

# Invalid values for format.
decompress_error(data_raw, -1)
decompress_error(data_raw, 5)

# Data that requires a higher wbits value.
decompress_error(data_wbits_6, deflate.RAW, 5)
print(len(decompress(data_wbits_6, deflate.RAW, 6)))
print(len(decompress(data_wbits_6, deflate.RAW, 7)))
decompress_error(data_wbits_8, deflate.RAW, 7)
print(len(decompress(data_wbits_8, deflate.RAW, 8)))
print(len(decompress(data_wbits_8, deflate.RAW, 9)))
decompress_error(data_wbits_10, deflate.RAW)
decompress_error(data_wbits_10, deflate.RAW, 9)
print(len(decompress(data_wbits_10, deflate.RAW, 10)))

# zlib header sets the size, so works with wbits unset or wbits >= 10.
data_wbits_10_zlib = b"(\x91\xcbH\xcd\xc9\xc9g\x18\xe9\x00\x08Fz\x18\x00\xc3`\xa4'\x03`2\x18\xe99\x01\x98\x13Fz\xfe\x07\xe6\xff\x91\x9e\xff\x81\xf9\x7f\xa4\xe7\x7f`\xfe\x1f\xba\xf9?#5''\x1f\x00[\xbc\x04)"
print(len(decompress(data_wbits_10_zlib, deflate.ZLIB)))
decompress_error(data_wbits_10_zlib, deflate.ZLIB, 9)
print(len(decompress(data_wbits_10_zlib, deflate.ZLIB, 10)))
print(len(decompress(data_wbits_10_zlib)))
Loading

0 comments on commit 8b315ef

Please sign in to comment.