-
Notifications
You must be signed in to change notification settings - Fork 135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Decompress util #244
base: develop
Are you sure you want to change the base?
Decompress util #244
Changes from 5 commits
48293bf
d62ba69
78338cf
ac3bdbf
f0b786d
758c1b6
837737c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,9 @@ | |
|
||
from __future__ import unicode_literals | ||
|
||
import bz2 | ||
import cgi | ||
import gzip | ||
import mimetypes | ||
import os | ||
import tempfile | ||
|
@@ -27,6 +29,11 @@ | |
except ImportError: | ||
from urllib.parse import urlparse # Python 3 | ||
|
||
try: | ||
import lzma | ||
except ImportError: | ||
lzma = None | ||
|
||
try: | ||
import magic | ||
except ImportError: | ||
|
@@ -158,14 +165,7 @@ def plugin_name_by_mime_type(mime_type, mime_name, file_extension): | |
None) | ||
|
||
|
||
def detect_local_source(path, content, mime_type=None, encoding=None): | ||
|
||
# TODO: may add sample_size | ||
|
||
filename = os.path.basename(path) | ||
parts = filename.split('.') | ||
extension = parts[-1] if len(parts) > 1 else None | ||
|
||
def describe_file_type(filename, content, mime_type=None, encoding=None): | ||
if magic is not None: | ||
detected = magic.detect_from_content(content) | ||
encoding = detected.encoding or encoding | ||
|
@@ -177,6 +177,19 @@ def detect_local_source(path, content, mime_type=None, encoding=None): | |
mime_name = None | ||
mime_type = mime_type or mimetypes.guess_type(filename)[0] | ||
|
||
return mime_type, encoding, mime_name | ||
|
||
def detect_local_source(path, content, mime_type=None, encoding=None): | ||
# TODO: may add sample_size | ||
|
||
filename = os.path.basename(path) | ||
parts = filename.split('.') | ||
extension = parts[-1] if len(parts) > 1 else None | ||
|
||
args = (filename, content) | ||
kwargs = dict(mime_type=mime_type, encoding=encoding) | ||
mime_type, encoding, mime_name = describe_file_type(*args, **kwargs) | ||
|
||
plugin_name = plugin_name_by_mime_type(mime_type, mime_name, extension) | ||
if encoding == 'binary': | ||
encoding = None | ||
|
@@ -297,3 +310,53 @@ def export_to_uri(table, uri, *args, **kwargs): | |
raise ValueError('Plugin (export) "{}" not found'.format(plugin_name)) | ||
|
||
return export_function(table, uri, *args, **kwargs) | ||
|
||
|
||
def decompress(path, **kwargs): | ||
""" | ||
Given a bz2, gzip or lzma file returns a decompressed file object. All | ||
kwargs are passed to either `bz2.openn`, `gzip.open` or `lzma.open`. | ||
:param path: (str) path to a bz2, gzip or lzma file | ||
""" | ||
filename = os.path.basename(path) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As the current API accepts filenames or file-objects, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That was indeed pretty helpful, thanks ; ) |
||
with open(path, 'rb') as handler: | ||
mime_type = describe_file_type(filename, handler.read())[0] | ||
|
||
bz2_mime_types = ( | ||
'application/bzip2', | ||
'application/octet-stream', | ||
'application/x-bz2', | ||
'application/x-bzip', | ||
'application/x-compressed', | ||
'application/x-stuffit' | ||
) | ||
gzip_mime_types = ( | ||
'application/gzip', | ||
'application/x-gzip', | ||
'application/x-gunzip', | ||
'application/gzipped', | ||
'application/gzip-compressed', | ||
'application/x-compressed', | ||
'application/x-compress', | ||
'gzip/document', | ||
'application/octet-stream' | ||
) | ||
lzma_mime_types = ( | ||
'application/x-xz', | ||
'application/x-lzma' | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think mimetype detection is not needed here - we can expect the user will call this function only if she knows the file is compressed and in one of the supported algorithms; we can do this detection automatically on the command-line interface using file-magic and then pass the correct arguments to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Totally : ) |
||
|
||
open_compressed = None | ||
if mime_type in bz2_mime_types: | ||
open_compressed = bz2.open | ||
if mime_type in gzip_mime_types: | ||
open_compressed = gzip.open | ||
if lzma and mime_type in lzma_mime_types: | ||
open_compressed = lzma.open | ||
|
||
if not open_compressed: | ||
msg = "Couldn't identify file mimetype, or lzma module isn't available" | ||
raise RuntimeError(msg) | ||
|
||
with open_compressed(path, **kwargs) as handler: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think having There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Basically |
||
return handler | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's very important to ensure the file object returned is open in binary mode (so the plugins will decode the data using the desired encoding). Could you please add a test for this case? |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,9 +17,19 @@ | |
|
||
from __future__ import unicode_literals | ||
|
||
import bz2 | ||
import gzip | ||
import os | ||
import tempfile | ||
import unittest | ||
|
||
try: | ||
import lzma | ||
except ImportError: | ||
lzma = None | ||
|
||
import six | ||
|
||
import rows.utils | ||
|
||
import tests.utils as utils | ||
|
@@ -71,3 +81,48 @@ def test_local_file_sample_size(self): | |
# TODO: test normalize_mime_type | ||
# TODO: test plugin_name_by_mime_type | ||
# TODO: test plugin_name_by_uri | ||
|
||
|
||
class UtilsDecompressTestCase(unittest.TestCase): | ||
|
||
def setUp(self): | ||
self.contents = six.b('Ahoy') | ||
self.temp = tempfile.TemporaryDirectory() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All the tests are failing here. I've replaced
Are the tests passing in your machine? |
||
|
||
def tearDown(self): | ||
self.temp.cleanup() | ||
|
||
def test_decompress_with_bz2(self): | ||
compressed = os.path.join(self.tmp.name, 'test.bz2') | ||
with bz2.open(compressed, mode='wb') as compressed_handler: | ||
compressed_handler.write(self.contents) | ||
decompressed = rows.utils.decompress(compressed) | ||
self.assertEqual(self.contents, decompressed.read()) | ||
|
||
def test_decompress_with_gz(self): | ||
compressed = os.path.join(self.tmp.name, 'test.gz') | ||
with gzip.open(compressed, mode='wb') as compressed_handler: | ||
compressed_handler.write(self.contents) | ||
decompressed = rows.utils.decompress(compressed) | ||
self.assertEqual(self.contents, decompressed.read()) | ||
|
||
@unittest.skipIf(not lzma, 'lzma module not available') | ||
def test_decompress_with_lzma(self): | ||
compressed = os.path.join(self.tmp.name, 'test.lzma') | ||
with lzma.open(compressed) as compressed_handler: | ||
compressed_handler.write(self.contents) | ||
decompressed = rows.utils.decompress(compressed) | ||
self.assertEqual(self.contents, decompressed.read()) | ||
|
||
@unittest.skipIf(not lzma, 'lzma module not available') | ||
def test_decompress_with_xz(self): | ||
compressed = os.path.join(self.tmp.name, 'test.gz') | ||
with lzma.open(compressed) as compressed_handler: | ||
compressed_handler.write(self.contents) | ||
decompressed = rows.utils.decompress(compressed) | ||
self.assertEqual(self.contents, decompressed.read()) | ||
|
||
def test_decompress_with_incompatible_file(self): | ||
with self.assertRaises(): | ||
with tempfile.NamedTemporaryFile() as tmp: | ||
rows.utils.decompress(tmp.name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add an
algorithm
parameter to this function? It should defaults toNone
(ifNone
, use file extension to define it).