-
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
251 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
# --------------------------------------------------------------------- | ||
# Gufo ACME: PowerDnsAcmeClient implementation | ||
# --------------------------------------------------------------------- | ||
# Copyright (C) 2023, Gufo Labs | ||
# --------------------------------------------------------------------- | ||
"""A PowerDnsAcmeClient implementation.""" | ||
|
||
# Python modules | ||
import hashlib | ||
from typing import Any | ||
|
||
import httpx | ||
|
||
# Third-party modules | ||
from josepy.json_util import encode_b64jose | ||
|
||
from ..error import ACMEFulfillmentFailed | ||
|
||
# Gufo ACME modules | ||
from ..log import logger | ||
from ..types import ACMEChallenge | ||
from .base import ACMEClient | ||
|
||
RESP_NO_CONTENT = 204 | ||
|
||
|
||
class PowerDnsAcmeClient(ACMEClient): | ||
""" | ||
PowerDNS compatible ACME Client. | ||
Fulfills dns-01 challenge by manipulating | ||
DNS RR via PowerDNS API. | ||
Args: | ||
api_url: Root url of the PowerDNS web. | ||
api_key: PowerDNS API key. | ||
""" | ||
|
||
def __init__( | ||
self: "PowerDnsAcmeClient", | ||
directory_url: str, | ||
*, | ||
api_url: str, | ||
api_key: str, | ||
**kwargs: Any, | ||
) -> None: | ||
super().__init__(directory_url, **kwargs) | ||
self.api_url = self._normalize_url(api_url) | ||
self.api_key = api_key | ||
|
||
@staticmethod | ||
def _normalize_url(url: str) -> str: | ||
if url.endswith("/"): | ||
return url[:-1] | ||
return url | ||
|
||
@staticmethod | ||
def _check_api_response(resp: httpx.Response) -> None: | ||
if resp.status_code != RESP_NO_CONTENT: | ||
msg = f"Failed to fulfill: Server returned {resp}" | ||
logger.error(msg) | ||
raise ACMEFulfillmentFailed(msg) | ||
|
||
async def fulfill_dns_01( | ||
self: "PowerDnsAcmeClient", domain: str, challenge: ACMEChallenge | ||
) -> bool: | ||
""" | ||
Fulfill dns-01 challenge. | ||
Update token via PowerDNS API. | ||
Args: | ||
domain: Domain name | ||
challenge: ACMEChallenge instance, containing token. | ||
Returns: | ||
True - on succeess. | ||
Raises: | ||
ACMEFulfillmentFailed: On error. | ||
""" | ||
# Calculate value | ||
v = encode_b64jose( | ||
hashlib.sha256(self.get_key_authorization(challenge)).digest() | ||
) | ||
# Set PDNS challenge | ||
async with self._get_client() as client: | ||
# Construct the API endpoint for updating a record in a specific zone | ||
endpoint = ( | ||
f"{self.api_url}/api/v1/servers/localhost/zones/{domain}" | ||
) | ||
# Set up the headers, including the API key for authentication | ||
headers = { | ||
"X-API-Key": self.api_key, | ||
"Content-Type": "application/json", | ||
} | ||
# Prepare the payload for the update | ||
update_payload = { | ||
"rrsets": [ | ||
{ | ||
"name": f"_acme-challenge.{domain}.", | ||
"type": "TXT", | ||
"ttl": 1, | ||
"changetype": "REPLACE", | ||
"records": [ | ||
{ | ||
"content": f'"{v}"', | ||
"disabled": False, | ||
} | ||
], | ||
} | ||
] | ||
} | ||
resp = await client.patch( | ||
endpoint, json=update_payload, headers=headers | ||
) | ||
self._check_api_response(resp) | ||
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
# --------------------------------------------------------------------- | ||
# CSR Proxy: PowerDnsAcmeClient client tests | ||
# --------------------------------------------------------------------- | ||
# Copyright (C) 2023, Gufo Labs | ||
# --------------------------------------------------------------------- | ||
|
||
# Python modules | ||
import asyncio | ||
import os | ||
|
||
# Third-party modules | ||
import httpx | ||
import pytest | ||
|
||
# Gufo ACME modules | ||
from gufo.acme.clients.base import ACMEClient | ||
from gufo.acme.clients.powerdns import PowerDnsAcmeClient | ||
from gufo.acme.error import ACMEFulfillmentFailed | ||
|
||
from .utils import DIRECTORY, EMAIL, get_csr_pem, not_set, not_set_reason | ||
|
||
ENV_CI_POWERDNS_TEST_DOMAIN = "CI_POWERDNS_TEST_DOMAIN" | ||
ENV_CI_POWERDNS_TEST_API_URL = "CI_POWERDNS_TEST_API_URL" | ||
ENV_CI_POWERDNS_TEST_API_KEY = "CI_POWERDNS_TEST_API_KEY" | ||
SCENARIO_ENV = [ | ||
ENV_CI_POWERDNS_TEST_DOMAIN, | ||
ENV_CI_POWERDNS_TEST_API_URL, | ||
ENV_CI_POWERDNS_TEST_API_KEY, | ||
] | ||
|
||
|
||
@pytest.mark.skipif(not_set(SCENARIO_ENV), reason=not_set_reason(SCENARIO_ENV)) | ||
def test_sign(): | ||
async def inner(): | ||
csr_pem = get_csr_pem(domain) | ||
# | ||
pk = PowerDnsAcmeClient.get_key() | ||
async with PowerDnsAcmeClient( | ||
DIRECTORY, | ||
api_url=os.getenv(ENV_CI_POWERDNS_TEST_API_URL), | ||
api_key=os.getenv(ENV_CI_POWERDNS_TEST_API_KEY), | ||
key=pk, | ||
) as client: | ||
# Register account | ||
uri = await client.new_account(EMAIL) | ||
assert uri | ||
# Create new order | ||
cert = await client.sign(domain, csr_pem) | ||
# Deactivate account | ||
await client.deactivate_account() | ||
assert cert | ||
assert b"BEGIN CERTIFICATE" in cert | ||
assert b"END CERTIFICATE" in cert | ||
|
||
domain = os.getenv(ENV_CI_POWERDNS_TEST_DOMAIN) or "" | ||
asyncio.run(inner()) | ||
|
||
|
||
def test_state(): | ||
client = ACMEClient( | ||
DIRECTORY, | ||
key=PowerDnsAcmeClient.get_key(), | ||
) | ||
state = client.get_state() | ||
client2 = PowerDnsAcmeClient.from_state( | ||
state, api_url="https://127.0.0.1/", api_key="xxx" | ||
) | ||
assert isinstance(client2, PowerDnsAcmeClient) | ||
|
||
|
||
@pytest.mark.parametrize( | ||
("url", "expected"), | ||
[ | ||
("https://example.com", "https://example.com"), | ||
("https://example.com/", "https://example.com"), | ||
], | ||
) | ||
def test_normalize_url(url: str, expected: str) -> None: | ||
r = PowerDnsAcmeClient._normalize_url(url) | ||
assert r == expected | ||
|
||
|
||
def test_invalid_response(): | ||
resp = httpx.Response(200) | ||
with pytest.raises(ACMEFulfillmentFailed): | ||
PowerDnsAcmeClient._check_api_response(resp) |