diff --git a/src/snappy/__init__.py b/src/snappy/__init__.py index 9b6c9b4..e7e83e3 100644 --- a/src/snappy/__init__.py +++ b/src/snappy/__init__.py @@ -9,6 +9,8 @@ StreamCompressor, StreamDecompressor, UncompressError, + HadoopStreamCompressor, + HadoopStreamDecompressor, isValidCompressed, ) diff --git a/src/snappy/snappy.py b/src/snappy/snappy.py index b4a6284..aa1a22e 100644 --- a/src/snappy/snappy.py +++ b/src/snappy/snappy.py @@ -206,6 +206,73 @@ def copy(self): return self +class HadoopStreamCompressor(): + def add_chunk(self, data: bytes, compress=None): + """Add a chunk, returning a string that is framed and compressed. + + Outputs a single snappy chunk; if it is the very start of the stream, + will also contain the stream header chunk. + """ + cdata = _compress(data) + return b"".join((len(data).to_bytes(4, "big"), len(cdata).to_bytes(4, "big"), cdata)) + + compress = add_chunk + + def flush(self): + # never maintains a buffer + return b"" + + def copy(self): + """This method exists for compatibility with the zlib compressobj. + """ + return self + + +class HadoopStreamDecompressor(): + def __init__(self): + self.remains = b"" + + @staticmethod + def check_format(data): + """Checks that there are enough bytes for a hadoop header + + We cannot actually determine if the data is really hadoop-snappy + """ + if len(data) < 8: + raise UncompressError("Too short data length") + chunk_length = int.from_bytes(data[4:8], "big") + + def decompress(self, data: bytes): + """Decompress 'data', returning a string containing the uncompressed + data corresponding to at least part of the data in string. This data + should be concatenated to the output produced by any preceding calls to + the decompress() method. Some of the input data may be preserved in + internal buffers for later processing. + """ + if self.remains: + data = self.remains + data + self.remains = None + if len(data) < 8: + self.remains = data + return b"" + out = [] + while True: + chunk_length = int.from_bytes(data[4:8], "big") + if len(data) < 8 + chunk_length: + self.remains = data + break + out.append(_uncompress(data[8:8 + chunk_length])) + data = data[8 + chunk_length:] + return b"".join(out) + + def flush(self): + return b"" + + def copy(self): + return self + + + def stream_compress(src, dst, blocksize=_STREAM_TO_STREAM_BLOCK_SIZE,