Skip to content

Commit

Permalink
Merge pull request #141 from martindurant/hadoop
Browse files Browse the repository at this point in the history
readd hadoop stream de/comp classes
  • Loading branch information
martindurant authored May 23, 2024
2 parents c6d2a7d + d0ca9e5 commit 3a355aa
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/snappy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
StreamCompressor,
StreamDecompressor,
UncompressError,
HadoopStreamCompressor,
HadoopStreamDecompressor,
isValidCompressed,
)

Expand Down
67 changes: 67 additions & 0 deletions src/snappy/snappy.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,73 @@ def copy(self):
return self


class HadoopStreamCompressor():
def add_chunk(self, data: bytes, compress=None):
"""Add a chunk, returning a string that is framed and compressed.
Outputs a single snappy chunk; if it is the very start of the stream,
will also contain the stream header chunk.
"""
cdata = _compress(data)
return b"".join((len(data).to_bytes(4, "big"), len(cdata).to_bytes(4, "big"), cdata))

compress = add_chunk

def flush(self):
# never maintains a buffer
return b""

def copy(self):
"""This method exists for compatibility with the zlib compressobj.
"""
return self


class HadoopStreamDecompressor():
def __init__(self):
self.remains = b""

@staticmethod
def check_format(data):
"""Checks that there are enough bytes for a hadoop header
We cannot actually determine if the data is really hadoop-snappy
"""
if len(data) < 8:
raise UncompressError("Too short data length")
chunk_length = int.from_bytes(data[4:8], "big")

def decompress(self, data: bytes):
"""Decompress 'data', returning a string containing the uncompressed
data corresponding to at least part of the data in string. This data
should be concatenated to the output produced by any preceding calls to
the decompress() method. Some of the input data may be preserved in
internal buffers for later processing.
"""
if self.remains:
data = self.remains + data
self.remains = None
if len(data) < 8:
self.remains = data
return b""
out = []
while True:
chunk_length = int.from_bytes(data[4:8], "big")
if len(data) < 8 + chunk_length:
self.remains = data
break
out.append(_uncompress(data[8:8 + chunk_length]))
data = data[8 + chunk_length:]
return b"".join(out)

def flush(self):
return b""

def copy(self):
return self



def stream_compress(src,
dst,
blocksize=_STREAM_TO_STREAM_BLOCK_SIZE,
Expand Down

0 comments on commit 3a355aa

Please sign in to comment.