forked from cannium/Sree
-
Notifications
You must be signed in to change notification settings - Fork 0
/
xmlparser.py
93 lines (84 loc) · 2.79 KB
/
xmlparser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# XML parser code copied from s3cmd
# http://github.com/s3tools/s3cmd
# Licensed under GPLv2
import re
try:
import xml.etree.ElementTree as ET
except ImportError:
import elementtree.ElementTree as ET
from xml.parsers.expat import ExpatError
def parseNodes(nodes):
## WARNING: Ignores text nodes from mixed xml/text.
## For instance <tag1>some text<tag2>other text</tag2></tag1>
## will be ignore "some text" node
retval = []
for node in nodes:
retval_item = {}
for child in node.getchildren():
name = child.tag
if child.getchildren():
retval_item[name] = parseNodes([child])
else:
retval_item[name] = node.findtext(".//%s" % child.tag)
retval.append(retval_item)
return retval
def stripNameSpace(xml):
"""
removeNameSpace(xml) -- remove top-level AWS namespace
"""
r = re.compile('^(<?[^>]+?>\s?)(<\w+) xmlns=[\'"](http://[^\'"]+)[\'"](.*)', re.MULTILINE)
if r.match(xml):
xmlns = r.match(xml).groups()[2]
xml = r.sub("\\1\\2\\4", xml)
else:
xmlns = None
return xml, xmlns
def getTreeFromXml(xml):
xml, xmlns = stripNameSpace(xml)
try:
tree = ET.fromstring(xml)
if xmlns:
tree.attrib['xmlns'] = xmlns
return tree
except ExpatError:
raise Exception("Bucket contains invalid filenames.")
except Exception as e:
raise e
def getListFromXml(xml, node):
tree = getTreeFromXml(xml)
nodes = tree.findall('.//%s' % (node))
return parseNodes(nodes)
def getDictFromTree(tree):
ret_dict = {}
for child in tree.getchildren():
if child.getchildren():
## Complex-type child. Recurse
content = getDictFromTree(child)
else:
content = child.text
if ret_dict.has_key(child.tag):
if not type(ret_dict[child.tag]) == list:
ret_dict[child.tag] = [ret_dict[child.tag]]
ret_dict[child.tag].append(content or "")
else:
ret_dict[child.tag] = content or ""
return ret_dict
def getTextFromXml(xml, xpath):
tree = getTreeFromXml(xml)
if tree.tag.endswith(xpath):
return decode_from_s3(tree.text) if tree.text is not None else None
else:
result = tree.findtext(xpath)
return decode_from_s3(result) if result is not None else None
def decode_from_s3(string, errors = "replace"):
"""
Convert S3 UTF-8 'string' to Unicode or raise an exception.
"""
if type(string) == unicode:
return string
# Be quiet by default
#debug("Decoding string from S3: %r" % string)
try:
return unicode(string, "UTF-8", errors)
except UnicodeDecodeError:
raise UnicodeDecodeError("Conversion to unicode failed: %r" % string)