Skip to content

Commit

Permalink
fix: avoid parsing crashes by using lxml
Browse files Browse the repository at this point in the history
  • Loading branch information
vzhd1701 committed Oct 19, 2023
1 parent 7f3969b commit 26d9b8a
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 19 deletions.
4 changes: 4 additions & 0 deletions enex2notion/cli_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ def upload_notebook(self, enex_file: Path):

self.notebook_notes_count = count_notes(enex_file)

logger.debug(
f"'{enex_file.stem}' notebook contains {self.notebook_notes_count} note(s)"
)

for note_idx, note in enumerate(iter_notes(enex_file), 1):
self.upload_note(note, note_idx)

Expand Down
9 changes: 8 additions & 1 deletion enex2notion/enex_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@


def count_notes(enex_file: Path) -> int:
return sum(iter_process_xml_elements(enex_file, "note", lambda e: 1))
return sum(
iter_process_xml_elements(enex_file, "note", lambda e: 1, _log_xml_errors)
)


def _log_xml_errors(xml_file: Path, errors):
logger.warning(f"'{xml_file.name}' file parsed with errors")
logger.debug("".join(errors))


def iter_notes(enex_file: Path) -> Iterator[EvernoteNote]:
Expand Down
79 changes: 62 additions & 17 deletions enex2notion/enex_parser_xml.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from collections import defaultdict
from pathlib import Path
from typing import Any, Callable, Dict, Iterator
from xml.etree import ElementTree
from typing import Any, Callable, Dict, Iterator, List, Optional

from lxml import etree
from lxml.etree import XMLSyntaxError, _Entity


def iter_xml_elements_as_dict(
Expand All @@ -13,46 +15,89 @@ def iter_xml_elements_as_dict(


def iter_process_xml_elements(
xml_file: Path, tag_name: str, handler_func: Callable[[Any], Any]
xml_file: Path,
tag_name: str,
element_callback: Callable[[Any], Any],
error_callback: Optional[Callable[[Path, List[str]], None]] = None,
) -> Iterator[Dict[str, Any]]:
with open(xml_file, "rb") as f:
context = ElementTree.iterparse(f, events=("start", "end"))
context = etree.iterparse(
f,
events=("start", "end"),
recover=True,
strip_cdata=False,
resolve_entities=False,
)

try:
_, root = next(context)

_, root = next(context)
for event, elem in context:
if event == "end" and elem.tag == tag_name:
yield element_callback(elem)

for event, elem in context:
if event == "end" and elem.tag == tag_name:
yield handler_func(elem)
root.clear()
except XMLSyntaxError:
pass
except Exception as e:
raise RuntimeError(f"Failed to parse {xml_file.name}") from e

root.clear()
errors = _format_error_list(xml_file.name, context.error_log)
if errors and error_callback:
error_callback(xml_file, errors)


# https://stackoverflow.com/a/10077069/13100286
def _etree_to_dict( # noqa: WPS210, WPS231, C901
t: ElementTree.Element,
) -> Dict[str, Any]:
def _etree_to_dict(t) -> Dict[str, Any]: # noqa: WPS210, WPS231, C901
d = {t.tag: {} if t.attrib else None}
children = list(t)

children = list(c for c in t if not isinstance(c, _Entity))
children_entities = list(c for c in t if isinstance(c, _Entity))

if children:
dd = defaultdict(list)

for dc in map(_etree_to_dict, children):
for k, v in dc.items():
dd[k].append(v)

d = {
t.tag: {
k: v[0] if len(v) == 1 else v # noqa: WPS441
for k, v in dd.items() # noqa: WPS221
}
}

if t.attrib:
d[t.tag].update(
(f"@{k}", v) for k, v in t.attrib.items() # noqa: WPS221, WPS441
)
if t.text:
text = t.text.strip()
(f"@{k}", v) for k, v in t.attrib.items()
) # noqa: WPS221, WPS441

if t.text or children_entities:
text = "".join([t.text or "", *_iter_entities_text(children_entities)]).strip()
if children or t.attrib:
if text:
d[t.tag]["#text"] = text
else:
d[t.tag] = text

return d


def _iter_entities_text(entities):
for e in entities:
yield _handle_bad_unicode_attr(e, "text")
yield _handle_bad_unicode_attr(e, "tail")


def _handle_bad_unicode_attr(obj, attr):
try:
return getattr(obj, attr)
except UnicodeDecodeError as e:
return e.object.decode("utf-8", "ignore")
except: # pragma: no cover
return ""


def _format_error_list(file_name, error_log) -> List[str]:
return [f"{file_name}:{e.line}:{e.column}:{e.message}" for e in error_log]
109 changes: 108 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pdfkit = "^1.0.0"
PyMuPDF = "^1.23.5"
notion-vzhd1701-fork = "0.0.37"
tqdm = "^4.66.1"
lxml = "^4.9.3"

[tool.poetry.group.test]
optional = true
Expand Down
Loading

0 comments on commit 26d9b8a

Please sign in to comment.