-
Notifications
You must be signed in to change notification settings - Fork 5
/
desec.py
executable file
·2293 lines (1929 loc) · 80.9 KB
/
desec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""Simple API client for desec.io.
It can be used as a standalone CLI tool or as a python module.
For more information on the CLI, run it with the --help parameter.
For more information on the module's classes and functions, refer to the respective
docstrings.
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import re
import sys
import time
import typing as t
from datetime import datetime, timezone
from enum import IntEnum
from hashlib import sha256, sha512
from pprint import pprint
import requests
try:
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
cryptography_available = True
except ModuleNotFoundError:
cryptography_available = False
try:
import dns.name
from dns import rdatatype, zone
dnspython_available = True
except ModuleNotFoundError:
dnspython_available = False
if t.TYPE_CHECKING:
import pathlib
__version__ = "0.0.0"
DnsRecordTypeType = t.Literal[
"A",
"AAAA",
"AFSDB",
"APL",
"CAA",
"CDNSKEY",
"CDS",
"CERT",
"CNAME",
"DHCID",
"DNAME",
"DNSKEY",
"DLV",
"DS",
"EUI48",
"EUI64",
"HINFO",
"HTTPS",
"KX",
"L32",
"L64",
"LOC",
"LP",
"MX",
"NAPTR",
"NID",
"NS",
"OPENPGPKEY",
"PTR",
"RP",
"SMIMEA",
"SPF",
"SRV",
"SSHFP",
"SVCB",
"TLSA",
"TXT",
"URI",
]
JsonGenericType = t.Union[
None,
int,
float,
str,
bool,
t.Sequence["JsonGenericType"],
t.Mapping[str, "JsonGenericType"],
]
class JsonTokenType(t.TypedDict):
"""API token information."""
allowed_subnets: list[str]
auto_policy: bool
created: str
id: str
is_valid: bool
last_used: str | None
max_age: str | None
max_unused_period: str | None
name: str
perm_create_domain: bool
perm_delete_domain: bool
perm_manage_tokens: bool
class JsonTokenSecretType(JsonTokenType):
"""API token information including the secret token value."""
token: str
class JsonTokenPolicyType(t.TypedDict):
"""API token policy information."""
id: str
domain: str | None
subname: str | None
type: str | None
perm_write: bool
class JsonDNSSECKeyInfoType(t.TypedDict):
"""DNSSEC public key information."""
dnskey: str
ds: list[str]
flags: int
keytype: str
managed: bool
class JsonDomainType(t.TypedDict):
"""Domain information."""
created: str
minimum_ttl: int
name: str
published: str
touched: str
class JsonDomainWithKeysType(JsonDomainType):
"""Domain information including DNSSEC public key information."""
keys: list[JsonDNSSECKeyInfoType]
class JsonRRsetWritableType(t.TypedDict):
"""Writable fields of RRset information."""
records: list[str]
subname: str
ttl: t.NotRequired[int]
type: DnsRecordTypeType
class JsonRRsetType(JsonRRsetWritableType):
"""RRset information."""
created: str
domain: str
name: str
touched: str
class JsonRRsetFromZonefileType(JsonRRsetWritableType):
"""RRset information parsed from a zone file."""
name: str
error_msg: t.NotRequired[str]
error_recovered: t.NotRequired[bool]
API_BASE_URL = "https://desec.io/api/v1"
RECORD_TYPES = t.get_args(DnsRecordTypeType)
class ExitCode(IntEnum):
"""Error codes use by the CLI tool and API related exceptions."""
OK = 0
GENERIC_ERROR = 1
INVALID_PARAMETERS = 3
API = 4
AUTH = 5
NOT_FOUND = 6
TLSA_CHECK = 7
RATE_LIMIT = 8
PERMISSION = 9
class DesecClientError(Exception):
"""Exception for all errors within the client."""
error_code = ExitCode.GENERIC_ERROR
class ParameterCheckError(DesecClientError):
"""Exception for parameter consistency check errors."""
error_code = ExitCode.INVALID_PARAMETERS
class TLSACheckError(DesecClientError):
"""Exception for TLSA record setup consistency check errors."""
error_code = ExitCode.TLSA_CHECK
class APIExpectationError(DesecClientError):
"""Exception for errors that are caused by unmet expectations in API responses."""
error_code = ExitCode.GENERIC_ERROR
class APIError(DesecClientError):
"""Exception for errors returned by the API.
If initialized with a HTTP response, an attempt is made to parse error information from
the response and include it in the string representation of this exception, replacing
the `{detail}` placeholder in the message template.
Args:
response: HTTP response from the deSEC API that caused this exception.
"""
error_code = ExitCode.API
message_template = "Unexpected error code {code}: {detail}"
def __init__(self, response: requests.Response):
self._response = response
def __str__(self) -> str:
"""Return a string representation of this exception.
The formatting is based on the message template and takes the HTTP response into
account.
The message template may contain the following placeholders:
* `code`: Replaced by the HTTP status code.
* `detail`: Replaced by the error message from the HTTP response, if it can be
parsed.
Returns:
A human-readable text representation of the error condition.
"""
if self._response.headers["Content-Type"] == "application/json":
json_data = self._response.json()
if not isinstance(json_data, list):
json_data = [json_data]
detail = ""
for entry in json_data:
try:
detail += t.cast(dict[t.Literal["detail"], str], entry)["detail"] + "\n"
except KeyError:
for attribute, messages in entry.items():
detail += attribute + ":\n " + " \n".join(messages) + "\n"
detail = detail.rstrip()
else:
detail = self._response.text
return self.message_template.format(code=self._response.status_code, detail=detail)
class AuthenticationError(APIError):
"""Exception for authentication failure."""
error_code = ExitCode.AUTH
message_template = "Authentication error: {detail}"
class NotFoundError(APIError):
"""Exception when data can not be found."""
error_code = ExitCode.NOT_FOUND
message_template = "{detail}"
class ParameterError(APIError):
"""Exception for invalid parameters, such as DNS records."""
error_code = ExitCode.INVALID_PARAMETERS
message_template = "Invalid parameter(s):\n{detail}"
class ConflictError(APIError):
"""Exception for conflicts returned by the API."""
error_code = ExitCode.INVALID_PARAMETERS
message_template = "Conflict:\n{detail}"
class RateLimitError(APIError):
"""Exception for API rate limits."""
error_code = ExitCode.RATE_LIMIT
message_template = "Rate limited: {detail}"
class TokenPermissionError(APIError):
"""Exception for API insufficient token permissions."""
error_code = ExitCode.PERMISSION
message_template = "Restricted token: {detail}"
class TokenAuth(requests.auth.AuthBase):
"""Token-based authentication for requests.
Custom authentication hook for requests to handle token-based authentication as
required by the deSEC API.
Args:
token: The authentication token value.
"""
def __init__(self, token: str):
self.token = token
def __call__(self, r: requests.PreparedRequest) -> requests.PreparedRequest:
"""Attaches token-based Authorization header to a given Request object."""
r.headers["Authorization"] = f"Token {self.token}"
return r
class TLSAField:
"""Abstract class for handling TLSA fields.
This class (or its subclasses) allow using numeric values and symbolic names
interchangeably.
Args:
value: The field value this objects represents. May be numeric or symbolic.
Raises:
ValueError: The supplied value is not valid for this type of field.
"""
valid_values: tuple[str, ...]
def __init__(self, value: str | int):
try:
value = self.valid_values.index(str(value).upper())
except ValueError:
pass
self._value = int(value)
try:
self.valid_values[self._value]
except IndexError as e: # pragma: no cover
raise ValueError(f"Invalid type {value} for {self.__class__}") from e
def __eq__(self, other: object) -> bool:
if isinstance(other, int):
return self._value == other
elif isinstance(other, str):
return self.valid_values[self._value] == other.upper()
elif isinstance(other, self.__class__):
return self._value == other._value
return False # pragma: no cover
def __repr__(self) -> str:
return self.valid_values[self._value]
def __int__(self) -> int:
return self._value
class TLSAUsage(TLSAField):
"""TLSA certificate usage information."""
valid_values = ("PKIX-TA", "PKIX-EE", "DANE-TA", "DANE-EE")
class TLSASelector(TLSAField):
"""TLSA selector."""
valid_values = ("CERT", "SPKI")
class TLSAMatchType(TLSAField):
"""TLSA match type."""
valid_values = ("FULL", "SHA2-256", "SHA2-512")
class APIClient:
"""deSEC.io API client.
Args:
token: API authorization token
request_timeout: HTTP request timeout in seconds. Note that the timeout is applied
to individual HTTP requests and the methods of this class may make multiple
requests. Set to `None` to disable.
retry_limit: Number of retries when hitting the API's rate limit.
Set to 0 to disable.
logger: Logger instance to send HTTP debug information to. Defaults to the named
logger `desec.client`.
"""
def __init__(
self,
token: str,
request_timeout: int | None = 15,
retry_limit: int = 3,
logger: logging.Logger = logging.getLogger("desec.client"), # noqa: B008
):
self._token_auth = TokenAuth(token)
self._request_timeout = request_timeout
self._retry_limit = retry_limit
self.logger = logger
"Logger instance to send HTTP debug information to."
@staticmethod
def _get_response_content(response: requests.Response) -> JsonGenericType:
"""Safely get content from a response.
Args:
response: requests Response object
Returns:
If the response body contains JSON data, it is parsed into the respective
Python data structures. Otherwise the response body as a string.
"""
content_type = response.headers.get("Content-Type")
if content_type == "text/dns":
return response.text
elif content_type == "application/json":
try:
return response.json()
except ValueError: # pragma: no cover
return response.text
else:
return response.text
@t.overload
def query(
self,
method: t.Literal["DELETE", "GET"],
url: str,
data: t.Mapping[str, str | int | float | bool | None] | None = None,
) -> JsonGenericType | str: ...
@t.overload
def query(
self, method: t.Literal["PATCH", "POST", "PUT"], url: str, data: JsonGenericType = None
) -> JsonGenericType | str: ...
def query(
self,
method: t.Literal["DELETE", "GET", "PATCH", "POST", "PUT"],
url: str,
data: JsonGenericType = None,
) -> JsonGenericType | str:
"""Query the API.
This method handles low-level queries to the deSEC API and should not be used
directly. Prefer the more high-level methods that implement specific API functions
instead.
If the initial request hits the API's rate limit, it is retired up to
`self._retry_limit` times, after waiting for the interval returned by the API.
Unless another process is using the API in parallel, no more than one retry
should be needed.
If the API refuses to answer the query because it would return more data than the
API's limit for a single response, the query is retried in pagination mode. This
means that the API is queries repeatedly until all results are retrieved. The
responses are merged and returned in a single list.
Args:
method: HTTP method to use.
url: Target URL to query.
data: Data to send in either the body or as URL parameters (for HTTP methods
that do not support body parameters). URL parameters must be supplied as a
simple key-value dictionary while body parameters may be more complex JSON
structures.
Returns:
The response body.
If the response body contains JSON data, it is parsed into the respective Python
data structures.
If the response body is empty, `None` is returned.
Raises:
ParameterError: The API returned status code 400 (Bad Request).
Request parameters were incorrect or invalid.
AuthenticationError: The API returned status code 401 (Unauthorized).
The supplied authentication token is not valid for this query (e.g. the
domain is not managed by this account).
TokenPermissionError: The API returned status code 403 (Forbidden).
The requested operation is not allowed for the given token (or account).
NotFoundError: The API returned status code (Not Found).
The object to operate on was not found (e.g. the domain or RRset).
ConflictError: The API returned status code 409 (Conflict).
The requested operation conflicts with existing data or a deSEC policy.
RateLimitError: The API returned status code 429 (Too Many Requests).
The request hit the API's rate limit. Retries up to the configured limit
were made, but also hit the rate limit.
APIError: The API returned an unexpected error.
requests.Timeout: The API failed to reply to an HTTP request within the time
limit.
"""
if method == "GET" or method == "DELETE":
params = t.cast("t.Mapping[str, str | int | float | bool | None] | None", data)
body = None
else:
params = None
body = data
merged_result = []
next_url: str | None = url
r = requests.Response() # Without this line, mypy considers r as possibly undefined.
while next_url is not None:
retry_after = 0
# Loop until we do not hit the rate limit (or we reach retry_limit + 1
# iterations). Ideally, that should be only one or two iterations.
for _ in range(max(1, self._retry_limit + 1)):
# If we did hit the rate limit on the previous iteration, wait until it
# expires.
time.sleep(retry_after)
# Send the request.
self.logger.debug(
f"Request: {method} {url}",
extra=dict(method=method, url=url, params=params, body=body),
)
r = requests.request(
method,
next_url,
auth=self._token_auth,
params=params,
json=body,
timeout=self._request_timeout,
)
self.logger.debug(
f"Response: {r.status_code} for {method} {url}",
extra=dict(
response_code=r.status_code,
response_body=self._get_response_content(r),
),
)
if r.status_code != 429:
# Not rate limited. Response is handled below.
break
# Handle rate limiting. See https://desec.readthedocs.io/en/latest/rate-limits.html
try:
retry_after = int(r.headers["Retry-After"])
except (KeyError, ValueError) as e: # pragma: no cover
# Retry-After header is missing or not an integer. This should never
# happen.
raise RateLimitError(response=r) from e
else:
# Reached retry_limit (or it is 0) without any other response than 429.
raise RateLimitError(response=r)
# Handle pagination. The API returns a "Link" header if the query requires
# pagination. If the status code is 400, that means we did not request
# pagination, but should have done so.
# In this case, we redo the request using the "fist" URL from the "Link" header.
# Otherwise, we use the "next" URL to get the next set of results.
# Reference: https://desec.readthedocs.io/en/latest/dns/rrsets.html#pagination
if "Link" in r.headers:
if r.status_code == 400:
# Pagination is required. The "first" URL points to the starting point.
links = self.parse_links(r.headers["Link"])
next_url = links["first"]
else:
# We got partial results from pagination. Merge them with the results we
# already have and go on with the "next" URL (if any).
merged_result.extend(r.json())
links = self.parse_links(r.headers["Link"])
next_url = links.get("next")
else:
# No pagination -> no further requests.
break
if r.status_code == 400:
raise ParameterError(response=r)
elif r.status_code == 401:
raise AuthenticationError(response=r)
elif r.status_code == 403:
raise TokenPermissionError(response=r)
elif r.status_code == 404:
raise NotFoundError(response=r)
elif r.status_code == 409:
raise ConflictError(response=r)
elif r.status_code >= 400: # pragma: no cover
raise APIError(response=r)
# Get Header: Content-Type
try:
content_type = r.headers["Content-Type"]
except KeyError:
content_type = None
# Process response data according to content-type.
response_data: JsonGenericType
if content_type == "text/dns":
response_data = r.text
elif content_type == "application/json":
if merged_result:
# Merged results from paginated queries don't need further processing.
response_data = merged_result
else:
try:
response_data = r.json()
except ValueError: # pragma: no cover
response_data = None
else:
response_data = None
return response_data
def parse_links(self, links: str) -> dict[str, str]:
"""Parse `Link:` response header used for pagination.
See https://desec.readthedocs.io/en/latest/dns/rrsets.html#pagination
Args:
links: `Link:` header returned by the API.
Returns:
A dictionary containing the URLs from the header, indexed by their respective
`rel` attribute. In other words, the "next" attribute references the URL that
returns the next portion of the requested data.
Raises:
APIExpectationError: Parsing the given `Link` response header failed.
"""
mapping = {}
for link in links.split(", "):
_url, label = link.split("; ")
m = re.search('rel="(.*)"', label)
if m is None:
raise APIExpectationError("Unexpected format in Link header")
label = m.group(1)
_url = _url[1:-1]
mapping[label] = _url
return mapping
def list_tokens(self) -> list[JsonTokenType]:
"""Return information about all current tokens.
See https://desec.readthedocs.io/en/latest/auth/tokens.html#retrieving-all-current-tokens
Returns:
A list of tokens that exist for the current account. Each token is returned as a
dictionary containing all available token metadata. Note that the actual token
values are not included, as the API does not return them.
Raises:
AuthenticationError: The token used for authentication is invalid.
TokenPermissionError: The token used for authentication does not have the
"perm_manage_tokens" attribute.
APIError: The API returned an unexpected error.
"""
url = f"{API_BASE_URL}/auth/tokens/"
data = self.query("GET", url)
return t.cast(list[JsonTokenType], data)
def create_token(
self,
name: str = "",
manage_tokens: bool | None = None,
create_domain: bool | None = None,
delete_domain: bool | None = None,
allowed_subnets: list[str] | None = None,
auto_policy: bool | None = None,
) -> JsonTokenSecretType:
"""Create a new authentication token.
See https://desec.readthedocs.io/en/latest/auth/tokens.html#create-additional-tokens
Args:
name: Set the "name" attribute of the new token to this value.
manage_tokens: Set the "perm_manage_tokens" attribute of the new token to this
value.
create_domain: Set the "perm_create_domain" attribute of the new token to this
value.
delete_domain: Set the "perm_delete_domain" attribute of the new token to this
value.
allowed_subnets: Set the "allowed_subnets" attribute of the new token to this
value.
auto_policy: Set the "auto_policy" attribute of the new token to this value.
Returns:
A dictionary containing all metadata of the newly created token as well as the
token value itself.
Raises:
AuthenticationError: The token used for authentication is invalid.
TokenPermissionError: The token used for authentication does not have the
"perm_manage_tokens" attribute.
APIError: The API returned an unexpected error.
"""
url = f"{API_BASE_URL}/auth/tokens/"
request_data: JsonGenericType
request_data = {"name": name}
if manage_tokens is not None:
request_data["perm_manage_tokens"] = manage_tokens
if create_domain is not None:
request_data["perm_create_domain"] = create_domain
if delete_domain is not None:
request_data["perm_delete_domain"] = delete_domain
if allowed_subnets is not None:
request_data["allowed_subnets"] = allowed_subnets
if auto_policy is not None:
request_data["auto_policy"] = auto_policy
data = self.query("POST", url, request_data)
return t.cast(JsonTokenSecretType, data)
def modify_token(
self,
token_id: str,
name: str | None = None,
manage_tokens: bool | None = None,
create_domain: bool | None = None,
delete_domain: bool | None = None,
allowed_subnets: list[str] | None = None,
auto_policy: bool | None = None,
) -> JsonTokenType:
"""Modify an existing authentication token.
See https://desec.readthedocs.io/en/latest/auth/tokens.html#modifying-a-token
Args:
token_id: The unique id of the token to modify.
name: Set the "name" attribute of the target token to this value.
manage_tokens: Set the "perm_manage_tokens" attribute of the target token to
this value.
create_domain: Set the "perm_create_domain" attribute of the new token to this
value.
delete_domain: Set the "perm_delete_domain" attribute of the new token to this
value.
allowed_subnets: Set the "allowed_subnets" attribute of the target token to this
value.
auto_policy: Set the "auto_policy" attribute of the target token to this value.
Returns:
A dictionary containing all metadata of the changed token, not including the
token value itself.
Raises:
AuthenticationError: The token used for authentication is invalid.
TokenPermissionError: The token used for authentication does not have the
"perm_manage_tokens" attribute.
APIError: The API returned an unexpected error.
"""
url = f"{API_BASE_URL}/auth/tokens/{token_id}/"
request_data: JsonGenericType
request_data = {}
if name is not None:
request_data["name"] = name
if manage_tokens is not None:
request_data["perm_manage_tokens"] = manage_tokens
if create_domain is not None:
request_data["perm_create_domain"] = create_domain
if delete_domain is not None:
request_data["perm_delete_domain"] = delete_domain
if allowed_subnets is not None:
request_data["allowed_subnets"] = allowed_subnets
if auto_policy is not None:
request_data["auto_policy"] = auto_policy
data = self.query("PATCH", url, request_data)
return t.cast(JsonTokenType, data)
def delete_token(self, token_id: str) -> None:
"""Delete an authentication token.
See https://desec.readthedocs.io/en/latest/auth/tokens.html#delete-tokens
Args:
token_id: The unique id of the token to delete.
Raises:
AuthenticationError: The token used for authentication is invalid.
TokenPermissionError: The token used for authentication does not have the
"perm_manage_tokens" attribute.
APIError: The API returned an unexpected error.
"""
url = f"{API_BASE_URL}/auth/tokens/{token_id}/"
_ = self.query("DELETE", url)
def list_token_policies(self, token_id: str) -> list[JsonTokenPolicyType]:
"""Return a list of all policies for the given token.
See https://desec.readthedocs.io/en/latest/auth/tokens.html#token-scoping-policies
Args:
token_id: The unique id of the token for which to get policies.
Returns:
A list of token policies for the given token. Each policy is returned as a
dictionary containing all available policy data.
Raises:
AuthenticationError: The token used for authentication is invalid.
TokenPermissionError: The token used for authentication does not have the
"perm_manage_tokens" attribute.
APIError: The API returned an unexpected error.
"""
url = f"{API_BASE_URL}/auth/tokens/{token_id}/policies/rrsets/"
data = self.query("GET", url)
return t.cast(list[JsonTokenPolicyType], data)
def add_token_policy(
self,
token_id: str,
domain: str | None = None,
subname: str | None = None,
rtype: str | None = None,
perm_write: bool = False,
) -> JsonTokenPolicyType:
"""Add a policy to the given token.
See https://desec.readthedocs.io/en/latest/auth/tokens.html#token-scoping-policies
Args:
token_id: The unique id of the token for which to add a policy.
domain: The domain to which the policy applies. `None` indicates the default
policy.
subname: DNS entry name. `None` indicates the default policy.
rtype: DNS record type. `None` indicates the default policy.
perm_write: Boolean indicating whether to allow or deny writes.
Returns:
A dictionary containing all data of the newly created token policy.
Raises:
AuthenticationError: The token used for authentication is invalid.
TokenPermissionError: The token used for authentication does not have the
"perm_manage_tokens" attribute.
ConflictError: There is a conflicting policy for this token, domain, subname
and type.
APIError: The API returned an unexpected error.
"""
url = f"{API_BASE_URL}/auth/tokens/{token_id}/policies/rrsets/"
request_data: JsonGenericType
request_data = {
"domain": domain,
"subname": subname,
"type": rtype,
"perm_write": perm_write,
}
data = self.query("POST", url, request_data)
return t.cast(JsonTokenPolicyType, data)
def modify_token_policy(
self,
token_id: str,
policy_id: str,
domain: str | None | t.Literal[False] = False,
subname: str | None | t.Literal[False] = False,
rtype: str | None | t.Literal[False] = False,
perm_write: bool | None = None,
) -> JsonTokenPolicyType:
"""Modify an existing policy for the given token.
See https://desec.readthedocs.io/en/latest/auth/tokens.html#token-scoping-policies
Args:
token_id: The unique id of the token for which to modify a policy.
policy_id: The unique id of the policy to modify.
domain: Set the domain to which the policy applies. `None` indicates the
default policy. `False` leaves the value unchanged.
subname: Set the DNS entry name. `None` indicates the default policy. `False`
leaves the value unchanged.
rtype: Set the DNS record type. `None` indicates the default policy. `False`
leaves the value unchanged.
perm_write: Boolean indicating whether to allow or deny writes. `None` leaves
the value unchanged.
Returns:
A dictionary containing all data of the modified token policy.
Raises:
AuthenticationError: The token used for authentication is invalid.
TokenPermissionError: The token used for authentication does not have the
"perm_manage_tokens" attribute.
ConflictError: There is a conflicting policy for this token, domain, subname
and type.
APIError: The API returned an unexpected error.
"""
url = f"{API_BASE_URL}/auth/tokens/{token_id}/policies/rrsets/{policy_id}/"
request_data: JsonGenericType
request_data = {}
if domain is not False:
request_data["domain"] = domain
if subname is not False:
request_data["subname"] = subname
if rtype is not False:
request_data["type"] = rtype
if perm_write is not None:
request_data["perm_write"] = perm_write
data = self.query("PATCH", url, request_data)
return t.cast(JsonTokenPolicyType, data)
def delete_token_policy(self, token_id: str, policy_id: str) -> None:
"""Delete an existing policy for the given token.
See https://desec.readthedocs.io/en/latest/auth/tokens.html#token-scoping-policies
Args:
token_id: The unique id of the token for which to delete a policy.
policy_id: The unique id of the policy to delete.
Raises:
AuthenticationError: The token used for authentication is invalid.
TokenPermissionError: The token used for authentication does not have the
"perm_manage_tokens" attribute.
APIError: The API returned an unexpected error.
"""
url = f"{API_BASE_URL}/auth/tokens/{token_id}/policies/rrsets/{policy_id}/"
_ = self.query("DELETE", url)
def list_domains(self) -> list[JsonDomainType]:
"""Return a list of all registered domains.
See https://desec.readthedocs.io/en/latest/dns/domains.html#listing-domains
Returns:
A list of all registered domains for the current account, including basic
metadata.
Raises:
AuthenticationError: The token used for authentication is invalid.
APIError: The API returned an unexpected error.
"""
url = f"{API_BASE_URL}/domains/"
data = self.query("GET", url)
return t.cast(list[JsonDomainType], data)
def domain_info(self, domain: str) -> JsonDomainWithKeysType:
"""Return basic information about a domain.
See https://desec.readthedocs.io/en/latest/dns/domains.html#retrieving-a-specific-domain
Args:
domain: The name of the domain to retrieve.
Returns:
A dictionary containing all metadata for the given domain including DNSSEC key
information.
Raises:
AuthenticationError: The token used for authentication is invalid.
NotFoundError: The given domain was not found in the current account.
APIError: The API returned an unexpected error.
"""
url = f"{API_BASE_URL}/domains/{domain}/"
data = self.query("GET", url)
return t.cast(JsonDomainWithKeysType, data)
def new_domain(self, domain: str) -> JsonDomainWithKeysType:
"""Create a new domain.
See https://desec.readthedocs.io/en/latest/dns/domains.html#creating-a-domain
Args:
domain: The name of the domain to create.
Returns:
A dictionary containing all metadata for the newly created domain including
DNSSEC key information.
Raises:
AuthenticationError: The token used for authentication is invalid.
ParameterError: The given domain name is incorrect, conflicts with an existing
domain or is disallowed by policy.
TokenPermissionError: The token used for authentication can not create domains
or the maximum number of domains for the current account has been reached.
APIError: The API returned an unexpected error.
"""
url = f"{API_BASE_URL}/domains/"
data = self.query("POST", url, data={"name": domain})