Skip to content

Commit

Permalink
Merge pull request #23 from jepler/no-tracemalloc
Browse files Browse the repository at this point in the history
Improve coverage & create InvalidContentError
  • Loading branch information
jepler authored Jul 18, 2024
2 parents 9817b72 + 37f0ca2 commit 2fad09f
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 38 deletions.
18 changes: 13 additions & 5 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
# SPDX-FileCopyrightText: 2021 Jeff Epler
# SPDX-FileCopyrightText: 2021-2024 Jeff Epler
#
# SPDX-License-Identifier: GPL-3.0-only
[run]
omit =
*/site-packages/*
test*.py
[report]
exclude_also =
def __repr__
if self.debug:
if settings.DEBUG
raise AssertionError
raise NotImplementedError
if 0:
if __name__ == .__main__.:
if TYPE_CHECKING:
class .*\bProtocol\):
@(abc\.)?abstractmethod
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ jobs:
run: make mypy

- name: Test
run: python -X tracemalloc=3 -mcoverage run --branch -m unittest testleapseconddata.py && python -mcoverage report --fail-under=100 && python -mcoverage xml
run: python -mcoverage run --branch -m unittest testleapseconddata.py && python -mcoverage report --fail-under=100 && python -mcoverage xml

pre-commit:
runs-on: ubuntu-latest
Expand Down
82 changes: 50 additions & 32 deletions leapseconddata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@
import datetime
import hashlib
import io
import itertools
import logging
import pathlib
import re
import urllib.request
from dataclasses import dataclass, field
from typing import BinaryIO, ClassVar
from typing import TYPE_CHECKING, BinaryIO, ClassVar

if TYPE_CHECKING: # pragma no cover
from collections.abc import Sequence

tai = datetime.timezone(datetime.timedelta(0), "TAI")

Expand Down Expand Up @@ -58,6 +62,10 @@ class InvalidHashError(ValueError):
"""The file hash could not be verified"""


class InvalidContentError(ValueError):
"""A line in the file was not valid"""


def _from_ntp_epoch(value: int) -> datetime.datetime:
return NTP_EPOCH + datetime.timedelta(seconds=value)

Expand Down Expand Up @@ -237,6 +245,7 @@ def from_standard_source(
when: datetime.datetime | None = None,
*,
check_hash: bool = True,
custom_sources: Sequence[str] = (),
) -> LeapSecondData:
"""Get the list of leap seconds from a standard source.
Expand All @@ -246,22 +255,28 @@ def from_standard_source(
Using a list of standard sources, including network sources, find a
leap-second.list data valid for the given timestamp, or the current
time (if unspecified)
If ``custom_sources`` is specified, this list of URLs is checked before
the hard-coded sources.
"""
for location in cls.standard_file_sources + cls.standard_network_sources:
for location in itertools.chain(custom_sources, cls.standard_file_sources, cls.standard_network_sources):
logging.debug("Trying leap second data from %s", location)
try:
candidate = cls.from_url(location, check_hash=check_hash)
except InvalidHashError: # pragma no cover
except InvalidHashError:
logging.warning("Invalid hash while reading %s", location)
continue
if candidate is None: # pragma no cover
except InvalidContentError as e:
logging.warning("Invalid content while reading %s: %s", location, e)
continue
if candidate.valid(when): # pragma no branch
if candidate is None:
continue
if candidate.valid(when):
logging.info("Using leap second data from %s", location)
return candidate
logging.warning("Validity expired for %s", location) # pragma no cover
logging.warning("Validity expired for %s", location)

raise ValidityError("No valid leap-second.list file could be found") # pragma no cover
raise ValidityError("No valid leap-second.list file could be found")

@classmethod
def from_file(
Expand Down Expand Up @@ -338,36 +353,39 @@ def from_open_file(cls, open_file: BinaryIO, *, check_hash: bool = True) -> Leap

hasher = hashlib.sha1()

for row in open_file:
row = row.strip() # noqa: PLW2901
if row.startswith(b"#h"):
content_hash = cls._parse_content_hash(row)
continue
for row_ws in open_file:
row = row_ws.strip()
try:
if row.startswith(b"#h"):
content_hash = cls._parse_content_hash(row)
continue

if row.startswith(b"#@"):
parts = row.split()
hasher.update(parts[1])
valid_until = _from_ntp_epoch(int(parts[1]))
continue
if row.startswith(b"#@"):
parts = row.split()
hasher.update(parts[1])
valid_until = _from_ntp_epoch(int(parts[1]))
continue

if row.startswith(b"#$"):
parts = row.split()
hasher.update(parts[1])
last_updated = _from_ntp_epoch(int(parts[1]))
continue
if row.startswith(b"#$"):
parts = row.split()
hasher.update(parts[1])
last_updated = _from_ntp_epoch(int(parts[1]))
continue

row = row.split(b"#")[0].strip() # noqa: PLW2901
content_to_hash.extend(re.findall(rb"\d+", row))
row = row.split(b"#")[0].strip()
content_to_hash.extend(re.findall(rb"\d+", row))

parts = row.split()
if len(parts) != 2: # noqa: PLR2004
continue
hasher.update(parts[0])
hasher.update(parts[1])
parts = row.split()
if len(parts) != 2: # noqa: PLR2004
continue
hasher.update(parts[0])
hasher.update(parts[1])

when = _from_ntp_epoch(int(parts[0]))
tai_offset = datetime.timedelta(seconds=int(parts[1]))
leap_seconds.append(LeapSecondInfo(when, tai_offset))
when = _from_ntp_epoch(int(parts[0]))
tai_offset = datetime.timedelta(seconds=int(parts[1]))
leap_seconds.append(LeapSecondInfo(when, tai_offset))
except Exception as e:
raise InvalidContentError(f"Failed to parse: {row!r}: {e}") from e

if check_hash:
if content_hash is None:
Expand Down
12 changes: 12 additions & 0 deletions testleapseconddata.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ def test_empty(self) -> None:
datetime.timedelta(seconds=0),
)

def test_invalid2(self) -> None:
when = datetime.datetime(datetime.MAXYEAR, 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(seconds=1)
with self.assertRaises(leapseconddata.ValidityError):
leapseconddata.LeapSecondData.from_standard_source(
when,
custom_sources=[
"data:text/plain;base64,SGVsbG8sIFdvcmxkIQ==",
"data:text/plain,%23h%099dac5845%208acd32c0%202947d462%20daf4a943%20f58d9391%0A",
"file:///doesnotexist",
],
)

def test_tz(self) -> None:
when = datetime.datetime(1999, 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(seconds=1)
when = when.replace(fold=True)
Expand Down

0 comments on commit 2fad09f

Please sign in to comment.