Bump dnspython from 2.3.0 to 2.4.2 (#2123)

* Bump dnspython from 2.3.0 to 2.4.2

Bumps [dnspython](https://github.com/rthalley/dnspython) from 2.3.0 to 2.4.2.
- [Release notes](https://github.com/rthalley/dnspython/releases)
- [Changelog](https://github.com/rthalley/dnspython/blob/master/doc/whatsnew.rst)
- [Commits](https://github.com/rthalley/dnspython/compare/v2.3.0...v2.4.2)

---
updated-dependencies:
- dependency-name: dnspython
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Update dnspython==2.4.2

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com>

[skip ci]
This commit is contained in:
dependabot[bot] 2023-08-24 12:05:11 -07:00 committed by GitHub
parent 9f00f5dafa
commit c0aa4e4996
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
108 changed files with 2985 additions and 1136 deletions

View file

@ -17,50 +17,44 @@
"""Common DNSSEC-related functions and constants."""
from typing import Any, cast, Dict, List, Optional, Set, Tuple, Union
import base64
import contextlib
import functools
import hashlib
import math
import struct
import time
import base64
from datetime import datetime
from dns.dnssectypes import Algorithm, DSDigest, NSEC3Hash
from typing import Callable, Dict, List, Optional, Set, Tuple, Union, cast
import dns.exception
import dns.name
import dns.node
import dns.rdataset
import dns.rdata
import dns.rdatatype
import dns.rdataclass
import dns.rdataset
import dns.rdatatype
import dns.rrset
import dns.transaction
import dns.zone
from dns.dnssectypes import Algorithm, DSDigest, NSEC3Hash
from dns.exception import ( # pylint: disable=W0611
AlgorithmKeyMismatch,
DeniedByPolicy,
UnsupportedAlgorithm,
ValidationFailure,
)
from dns.rdtypes.ANY.CDNSKEY import CDNSKEY
from dns.rdtypes.ANY.CDS import CDS
from dns.rdtypes.ANY.DNSKEY import DNSKEY
from dns.rdtypes.ANY.DS import DS
from dns.rdtypes.ANY.NSEC import NSEC, Bitmap
from dns.rdtypes.ANY.NSEC3PARAM import NSEC3PARAM
from dns.rdtypes.ANY.RRSIG import RRSIG, sigtime_to_posixtime
from dns.rdtypes.dnskeybase import Flag
class UnsupportedAlgorithm(dns.exception.DNSException):
"""The DNSSEC algorithm is not supported."""
class AlgorithmKeyMismatch(UnsupportedAlgorithm):
"""The DNSSEC algorithm is not supported for the given key type."""
class ValidationFailure(dns.exception.DNSException):
"""The DNSSEC signature is invalid."""
class DeniedByPolicy(dns.exception.DNSException):
"""Denied by DNSSEC policy."""
PublicKey = Union[
"GenericPublicKey",
"rsa.RSAPublicKey",
"ec.EllipticCurvePublicKey",
"ed25519.Ed25519PublicKey",
@ -68,12 +62,15 @@ PublicKey = Union[
]
PrivateKey = Union[
"GenericPrivateKey",
"rsa.RSAPrivateKey",
"ec.EllipticCurvePrivateKey",
"ed25519.Ed25519PrivateKey",
"ed448.Ed448PrivateKey",
]
RRsetSigner = Callable[[dns.transaction.Transaction, dns.rrset.RRset], None]
def algorithm_from_text(text: str) -> Algorithm:
"""Convert text into a DNSSEC algorithm value.
@ -308,113 +305,13 @@ def _find_candidate_keys(
return [
cast(DNSKEY, rd)
for rd in rdataset
if rd.algorithm == rrsig.algorithm and key_id(rd) == rrsig.key_tag
if rd.algorithm == rrsig.algorithm
and key_id(rd) == rrsig.key_tag
and (rd.flags & Flag.ZONE) == Flag.ZONE # RFC 4034 2.1.1
and rd.protocol == 3 # RFC 4034 2.1.2
]
def _is_rsa(algorithm: int) -> bool:
return algorithm in (
Algorithm.RSAMD5,
Algorithm.RSASHA1,
Algorithm.RSASHA1NSEC3SHA1,
Algorithm.RSASHA256,
Algorithm.RSASHA512,
)
def _is_dsa(algorithm: int) -> bool:
return algorithm in (Algorithm.DSA, Algorithm.DSANSEC3SHA1)
def _is_ecdsa(algorithm: int) -> bool:
return algorithm in (Algorithm.ECDSAP256SHA256, Algorithm.ECDSAP384SHA384)
def _is_eddsa(algorithm: int) -> bool:
return algorithm in (Algorithm.ED25519, Algorithm.ED448)
def _is_gost(algorithm: int) -> bool:
return algorithm == Algorithm.ECCGOST
def _is_md5(algorithm: int) -> bool:
return algorithm == Algorithm.RSAMD5
def _is_sha1(algorithm: int) -> bool:
return algorithm in (
Algorithm.DSA,
Algorithm.RSASHA1,
Algorithm.DSANSEC3SHA1,
Algorithm.RSASHA1NSEC3SHA1,
)
def _is_sha256(algorithm: int) -> bool:
return algorithm in (Algorithm.RSASHA256, Algorithm.ECDSAP256SHA256)
def _is_sha384(algorithm: int) -> bool:
return algorithm == Algorithm.ECDSAP384SHA384
def _is_sha512(algorithm: int) -> bool:
return algorithm == Algorithm.RSASHA512
def _ensure_algorithm_key_combination(algorithm: int, key: PublicKey) -> None:
"""Ensure algorithm is valid for key type, throwing an exception on
mismatch."""
if isinstance(key, rsa.RSAPublicKey):
if _is_rsa(algorithm):
return
raise AlgorithmKeyMismatch('algorithm "%s" not valid for RSA key' % algorithm)
if isinstance(key, dsa.DSAPublicKey):
if _is_dsa(algorithm):
return
raise AlgorithmKeyMismatch('algorithm "%s" not valid for DSA key' % algorithm)
if isinstance(key, ec.EllipticCurvePublicKey):
if _is_ecdsa(algorithm):
return
raise AlgorithmKeyMismatch('algorithm "%s" not valid for ECDSA key' % algorithm)
if isinstance(key, ed25519.Ed25519PublicKey):
if algorithm == Algorithm.ED25519:
return
raise AlgorithmKeyMismatch(
'algorithm "%s" not valid for ED25519 key' % algorithm
)
if isinstance(key, ed448.Ed448PublicKey):
if algorithm == Algorithm.ED448:
return
raise AlgorithmKeyMismatch('algorithm "%s" not valid for ED448 key' % algorithm)
raise TypeError("unsupported key type")
def _make_hash(algorithm: int) -> Any:
if _is_md5(algorithm):
return hashes.MD5()
if _is_sha1(algorithm):
return hashes.SHA1()
if _is_sha256(algorithm):
return hashes.SHA256()
if _is_sha384(algorithm):
return hashes.SHA384()
if _is_sha512(algorithm):
return hashes.SHA512()
if algorithm == Algorithm.ED25519:
return hashes.SHA512()
if algorithm == Algorithm.ED448:
return hashes.SHAKE256(114)
raise ValidationFailure("unknown hash for algorithm %u" % algorithm)
def _bytes_to_long(b: bytes) -> int:
return int.from_bytes(b, "big")
def _get_rrname_rdataset(
rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
) -> Tuple[dns.name.Name, dns.rdataset.Rdataset]:
@ -424,85 +321,13 @@ def _get_rrname_rdataset(
return rrset.name, rrset
def _validate_signature(sig: bytes, data: bytes, key: DNSKEY, chosen_hash: Any) -> None:
keyptr: bytes
if _is_rsa(key.algorithm):
# we ignore because mypy is confused and thinks key.key is a str for unknown
# reasons.
keyptr = key.key
(bytes_,) = struct.unpack("!B", keyptr[0:1])
keyptr = keyptr[1:]
if bytes_ == 0:
(bytes_,) = struct.unpack("!H", keyptr[0:2])
keyptr = keyptr[2:]
rsa_e = keyptr[0:bytes_]
rsa_n = keyptr[bytes_:]
try:
rsa_public_key = rsa.RSAPublicNumbers(
_bytes_to_long(rsa_e), _bytes_to_long(rsa_n)
).public_key(default_backend())
except ValueError:
raise ValidationFailure("invalid public key")
rsa_public_key.verify(sig, data, padding.PKCS1v15(), chosen_hash)
elif _is_dsa(key.algorithm):
keyptr = key.key
(t,) = struct.unpack("!B", keyptr[0:1])
keyptr = keyptr[1:]
octets = 64 + t * 8
dsa_q = keyptr[0:20]
keyptr = keyptr[20:]
dsa_p = keyptr[0:octets]
keyptr = keyptr[octets:]
dsa_g = keyptr[0:octets]
keyptr = keyptr[octets:]
dsa_y = keyptr[0:octets]
try:
dsa_public_key = dsa.DSAPublicNumbers( # type: ignore
_bytes_to_long(dsa_y),
dsa.DSAParameterNumbers(
_bytes_to_long(dsa_p), _bytes_to_long(dsa_q), _bytes_to_long(dsa_g)
),
).public_key(default_backend())
except ValueError:
raise ValidationFailure("invalid public key")
dsa_public_key.verify(sig, data, chosen_hash)
elif _is_ecdsa(key.algorithm):
keyptr = key.key
curve: Any
if key.algorithm == Algorithm.ECDSAP256SHA256:
curve = ec.SECP256R1()
octets = 32
else:
curve = ec.SECP384R1()
octets = 48
ecdsa_x = keyptr[0:octets]
ecdsa_y = keyptr[octets : octets * 2]
try:
ecdsa_public_key = ec.EllipticCurvePublicNumbers(
curve=curve, x=_bytes_to_long(ecdsa_x), y=_bytes_to_long(ecdsa_y)
).public_key(default_backend())
except ValueError:
raise ValidationFailure("invalid public key")
ecdsa_public_key.verify(sig, data, ec.ECDSA(chosen_hash))
elif _is_eddsa(key.algorithm):
keyptr = key.key
loader: Any
if key.algorithm == Algorithm.ED25519:
loader = ed25519.Ed25519PublicKey
else:
loader = ed448.Ed448PublicKey
try:
eddsa_public_key = loader.from_public_bytes(keyptr)
except ValueError:
raise ValidationFailure("invalid public key")
eddsa_public_key.verify(sig, data)
elif _is_gost(key.algorithm):
raise UnsupportedAlgorithm(
'algorithm "%s" not supported by dnspython'
% algorithm_to_text(key.algorithm)
)
else:
raise ValidationFailure("unknown algorithm %u" % key.algorithm)
def _validate_signature(sig: bytes, data: bytes, key: DNSKEY) -> None:
public_cls = get_algorithm_cls_from_dnskey(key).public_cls
try:
public_key = public_cls.from_dnskey(key)
except ValueError:
raise ValidationFailure("invalid public key")
public_key.verify(sig, data)
def _validate_rrsig(
@ -559,29 +384,13 @@ def _validate_rrsig(
if rrsig.inception > now:
raise ValidationFailure("not yet valid")
if _is_dsa(rrsig.algorithm):
sig_r = rrsig.signature[1:21]
sig_s = rrsig.signature[21:]
sig = utils.encode_dss_signature(_bytes_to_long(sig_r), _bytes_to_long(sig_s))
elif _is_ecdsa(rrsig.algorithm):
if rrsig.algorithm == Algorithm.ECDSAP256SHA256:
octets = 32
else:
octets = 48
sig_r = rrsig.signature[0:octets]
sig_s = rrsig.signature[octets:]
sig = utils.encode_dss_signature(_bytes_to_long(sig_r), _bytes_to_long(sig_s))
else:
sig = rrsig.signature
data = _make_rrsig_signature_data(rrset, rrsig, origin)
chosen_hash = _make_hash(rrsig.algorithm)
for candidate_key in candidate_keys:
if not policy.ok_to_validate(candidate_key):
continue
try:
_validate_signature(sig, data, candidate_key, chosen_hash)
_validate_signature(rrsig.signature, data, candidate_key)
return
except (InvalidSignature, ValidationFailure):
# this happens on an individual validation failure
@ -673,6 +482,7 @@ def _sign(
lifetime: Optional[int] = None,
verify: bool = False,
policy: Optional[Policy] = None,
origin: Optional[dns.name.Name] = None,
) -> RRSIG:
"""Sign RRset using private key.
@ -708,6 +518,10 @@ def _sign(
*policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy,
``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624.
*origin*, a ``dns.name.Name`` or ``None``. If ``None``, the default, then all
names in the rrset (including its owner name) must be absolute; otherwise the
specified origin will be used to make names absolute when signing.
Raises ``DeniedByPolicy`` if the signature is denied by policy.
"""
@ -735,16 +549,26 @@ def _sign(
if expiration is not None:
rrsig_expiration = to_timestamp(expiration)
elif lifetime is not None:
rrsig_expiration = int(time.time()) + lifetime
rrsig_expiration = rrsig_inception + lifetime
else:
raise ValueError("expiration or lifetime must be specified")
# Derelativize now because we need a correct labels length for the
# rrsig_template.
if origin is not None:
rrname = rrname.derelativize(origin)
labels = len(rrname) - 1
# Adjust labels appropriately for wildcards.
if rrname.is_wild():
labels -= 1
rrsig_template = RRSIG(
rdclass=rdclass,
rdtype=dns.rdatatype.RRSIG,
type_covered=rdtype,
algorithm=dnskey.algorithm,
labels=len(rrname) - 1,
labels=labels,
original_ttl=original_ttl,
expiration=rrsig_expiration,
inception=rrsig_inception,
@ -753,63 +577,18 @@ def _sign(
signature=b"",
)
data = dns.dnssec._make_rrsig_signature_data(rrset, rrsig_template)
chosen_hash = _make_hash(rrsig_template.algorithm)
signature = None
data = dns.dnssec._make_rrsig_signature_data(rrset, rrsig_template, origin)
if isinstance(private_key, rsa.RSAPrivateKey):
if not _is_rsa(dnskey.algorithm):
raise ValueError("Invalid DNSKEY algorithm for RSA key")
signature = private_key.sign(data, padding.PKCS1v15(), chosen_hash)
if verify:
private_key.public_key().verify(
signature, data, padding.PKCS1v15(), chosen_hash
)
elif isinstance(private_key, dsa.DSAPrivateKey):
if not _is_dsa(dnskey.algorithm):
raise ValueError("Invalid DNSKEY algorithm for DSA key")
public_dsa_key = private_key.public_key()
if public_dsa_key.key_size > 1024:
raise ValueError("DSA key size overflow")
der_signature = private_key.sign(data, chosen_hash)
if verify:
public_dsa_key.verify(der_signature, data, chosen_hash)
dsa_r, dsa_s = utils.decode_dss_signature(der_signature)
dsa_t = (public_dsa_key.key_size // 8 - 64) // 8
octets = 20
signature = (
struct.pack("!B", dsa_t)
+ int.to_bytes(dsa_r, length=octets, byteorder="big")
+ int.to_bytes(dsa_s, length=octets, byteorder="big")
)
elif isinstance(private_key, ec.EllipticCurvePrivateKey):
if not _is_ecdsa(dnskey.algorithm):
raise ValueError("Invalid DNSKEY algorithm for EC key")
der_signature = private_key.sign(data, ec.ECDSA(chosen_hash))
if verify:
private_key.public_key().verify(der_signature, data, ec.ECDSA(chosen_hash))
if dnskey.algorithm == Algorithm.ECDSAP256SHA256:
octets = 32
else:
octets = 48
dsa_r, dsa_s = utils.decode_dss_signature(der_signature)
signature = int.to_bytes(dsa_r, length=octets, byteorder="big") + int.to_bytes(
dsa_s, length=octets, byteorder="big"
)
elif isinstance(private_key, ed25519.Ed25519PrivateKey):
if dnskey.algorithm != Algorithm.ED25519:
raise ValueError("Invalid DNSKEY algorithm for ED25519 key")
signature = private_key.sign(data)
if verify:
private_key.public_key().verify(signature, data)
elif isinstance(private_key, ed448.Ed448PrivateKey):
if dnskey.algorithm != Algorithm.ED448:
raise ValueError("Invalid DNSKEY algorithm for ED448 key")
signature = private_key.sign(data)
if verify:
private_key.public_key().verify(signature, data)
if isinstance(private_key, GenericPrivateKey):
signing_key = private_key
else:
raise TypeError("Unsupported key algorithm")
try:
private_cls = get_algorithm_cls_from_dnskey(dnskey)
signing_key = private_cls(key=private_key)
except UnsupportedAlgorithm:
raise TypeError("Unsupported key algorithm")
signature = signing_key.sign(data, verify)
return cast(RRSIG, rrsig_template.replace(signature=signature))
@ -858,9 +637,12 @@ def _make_rrsig_signature_data(
raise ValidationFailure("relative RR name without an origin specified")
rrname = rrname.derelativize(origin)
if len(rrname) - 1 < rrsig.labels:
name_len = len(rrname)
if rrname.is_wild() and rrsig.labels != name_len - 2:
raise ValidationFailure("wild owner name has wrong label length")
if name_len - 1 < rrsig.labels:
raise ValidationFailure("owner name longer than RRSIG labels")
elif rrsig.labels < len(rrname) - 1:
elif rrsig.labels < name_len - 1:
suffix = rrname.split(rrsig.labels + 1)[1]
rrname = dns.name.from_text("*", suffix)
rrnamebuf = rrname.to_digestable()
@ -884,9 +666,8 @@ def _make_dnskey(
) -> DNSKEY:
"""Convert a public key to DNSKEY Rdata
*public_key*, the public key to convert, a
``cryptography.hazmat.primitives.asymmetric`` public key class applicable
for DNSSEC.
*public_key*, a ``PublicKey`` (``GenericPublicKey`` or
``cryptography.hazmat.primitives.asymmetric``) to convert.
*algorithm*, a ``str`` or ``int`` specifying the DNSKEY algorithm.
@ -902,72 +683,13 @@ def _make_dnskey(
Return DNSKEY ``Rdata``.
"""
def encode_rsa_public_key(public_key: "rsa.RSAPublicKey") -> bytes:
"""Encode a public key per RFC 3110, section 2."""
pn = public_key.public_numbers()
_exp_len = math.ceil(int.bit_length(pn.e) / 8)
exp = int.to_bytes(pn.e, length=_exp_len, byteorder="big")
if _exp_len > 255:
exp_header = b"\0" + struct.pack("!H", _exp_len)
else:
exp_header = struct.pack("!B", _exp_len)
if pn.n.bit_length() < 512 or pn.n.bit_length() > 4096:
raise ValueError("unsupported RSA key length")
return exp_header + exp + pn.n.to_bytes((pn.n.bit_length() + 7) // 8, "big")
algorithm = Algorithm.make(algorithm)
def encode_dsa_public_key(public_key: "dsa.DSAPublicKey") -> bytes:
"""Encode a public key per RFC 2536, section 2."""
pn = public_key.public_numbers()
dsa_t = (public_key.key_size // 8 - 64) // 8
if dsa_t > 8:
raise ValueError("unsupported DSA key size")
octets = 64 + dsa_t * 8
res = struct.pack("!B", dsa_t)
res += pn.parameter_numbers.q.to_bytes(20, "big")
res += pn.parameter_numbers.p.to_bytes(octets, "big")
res += pn.parameter_numbers.g.to_bytes(octets, "big")
res += pn.y.to_bytes(octets, "big")
return res
def encode_ecdsa_public_key(public_key: "ec.EllipticCurvePublicKey") -> bytes:
"""Encode a public key per RFC 6605, section 4."""
pn = public_key.public_numbers()
if isinstance(public_key.curve, ec.SECP256R1):
return pn.x.to_bytes(32, "big") + pn.y.to_bytes(32, "big")
elif isinstance(public_key.curve, ec.SECP384R1):
return pn.x.to_bytes(48, "big") + pn.y.to_bytes(48, "big")
else:
raise ValueError("unsupported ECDSA curve")
the_algorithm = Algorithm.make(algorithm)
_ensure_algorithm_key_combination(the_algorithm, public_key)
if isinstance(public_key, rsa.RSAPublicKey):
key_bytes = encode_rsa_public_key(public_key)
elif isinstance(public_key, dsa.DSAPublicKey):
key_bytes = encode_dsa_public_key(public_key)
elif isinstance(public_key, ec.EllipticCurvePublicKey):
key_bytes = encode_ecdsa_public_key(public_key)
elif isinstance(public_key, ed25519.Ed25519PublicKey):
key_bytes = public_key.public_bytes(
encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw
)
elif isinstance(public_key, ed448.Ed448PublicKey):
key_bytes = public_key.public_bytes(
encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw
)
if isinstance(public_key, GenericPublicKey):
return public_key.to_dnskey(flags=flags, protocol=protocol)
else:
raise TypeError("unsupported key algorithm")
return DNSKEY(
rdclass=dns.rdataclass.IN,
rdtype=dns.rdatatype.DNSKEY,
flags=flags,
protocol=protocol,
algorithm=the_algorithm,
key=key_bytes,
)
public_cls = get_algorithm_cls(algorithm).public_cls
return public_cls(key=public_key).to_dnskey(flags=flags, protocol=protocol)
def _make_cdnskey(
@ -1216,23 +938,252 @@ def dnskey_rdataset_to_cdnskey_rdataset(
return dns.rdataset.from_rdata_list(rdataset.ttl, res)
def default_rrset_signer(
txn: dns.transaction.Transaction,
rrset: dns.rrset.RRset,
signer: dns.name.Name,
ksks: List[Tuple[PrivateKey, DNSKEY]],
zsks: List[Tuple[PrivateKey, DNSKEY]],
inception: Optional[Union[datetime, str, int, float]] = None,
expiration: Optional[Union[datetime, str, int, float]] = None,
lifetime: Optional[int] = None,
policy: Optional[Policy] = None,
origin: Optional[dns.name.Name] = None,
) -> None:
"""Default RRset signer"""
if rrset.rdtype in set(
[
dns.rdatatype.RdataType.DNSKEY,
dns.rdatatype.RdataType.CDS,
dns.rdatatype.RdataType.CDNSKEY,
]
):
keys = ksks
else:
keys = zsks
for private_key, dnskey in keys:
rrsig = dns.dnssec.sign(
rrset=rrset,
private_key=private_key,
dnskey=dnskey,
inception=inception,
expiration=expiration,
lifetime=lifetime,
signer=signer,
policy=policy,
origin=origin,
)
txn.add(rrset.name, rrset.ttl, rrsig)
def sign_zone(
zone: dns.zone.Zone,
txn: Optional[dns.transaction.Transaction] = None,
keys: Optional[List[Tuple[PrivateKey, DNSKEY]]] = None,
add_dnskey: bool = True,
dnskey_ttl: Optional[int] = None,
inception: Optional[Union[datetime, str, int, float]] = None,
expiration: Optional[Union[datetime, str, int, float]] = None,
lifetime: Optional[int] = None,
nsec3: Optional[NSEC3PARAM] = None,
rrset_signer: Optional[RRsetSigner] = None,
policy: Optional[Policy] = None,
) -> None:
"""Sign zone.
*zone*, a ``dns.zone.Zone``, the zone to sign.
*txn*, a ``dns.transaction.Transaction``, an optional transaction to use for
signing.
*keys*, a list of (``PrivateKey``, ``DNSKEY``) tuples, to use for signing. KSK/ZSK
roles are assigned automatically if the SEP flag is used, otherwise all RRsets are
signed by all keys.
*add_dnskey*, a ``bool``. If ``True``, the default, all specified DNSKEYs are
automatically added to the zone on signing.
*dnskey_ttl*, a``int``, specifies the TTL for DNSKEY RRs. If not specified the TTL
of the existing DNSKEY RRset used or the TTL of the SOA RRset.
*inception*, a ``datetime``, ``str``, ``int``, ``float`` or ``None``, the signature
inception time. If ``None``, the current time is used. If a ``str``, the format is
"YYYYMMDDHHMMSS" or alternatively the number of seconds since the UNIX epoch in text
form; this is the same the RRSIG rdata's text form. Values of type `int` or `float`
are interpreted as seconds since the UNIX epoch.
*expiration*, a ``datetime``, ``str``, ``int``, ``float`` or ``None``, the signature
expiration time. If ``None``, the expiration time will be the inception time plus
the value of the *lifetime* parameter. See the description of *inception* above for
how the various parameter types are interpreted.
*lifetime*, an ``int`` or ``None``, the signature lifetime in seconds. This
parameter is only meaningful if *expiration* is ``None``.
*nsec3*, a ``NSEC3PARAM`` Rdata, configures signing using NSEC3. Not yet
implemented.
*rrset_signer*, a ``Callable``, an optional function for signing RRsets. The
function requires two arguments: transaction and RRset. If the not specified,
``dns.dnssec.default_rrset_signer`` will be used.
Returns ``None``.
"""
ksks = []
zsks = []
# if we have both KSKs and ZSKs, split by SEP flag. if not, sign all
# records with all keys
if keys:
for key in keys:
if key[1].flags & Flag.SEP:
ksks.append(key)
else:
zsks.append(key)
if not ksks:
ksks = keys
if not zsks:
zsks = keys
else:
keys = []
if txn:
cm: contextlib.AbstractContextManager = contextlib.nullcontext(txn)
else:
cm = zone.writer()
with cm as _txn:
if add_dnskey:
if dnskey_ttl is None:
dnskey = _txn.get(zone.origin, dns.rdatatype.DNSKEY)
if dnskey:
dnskey_ttl = dnskey.ttl
else:
soa = _txn.get(zone.origin, dns.rdatatype.SOA)
dnskey_ttl = soa.ttl
for _, dnskey in keys:
_txn.add(zone.origin, dnskey_ttl, dnskey)
if nsec3:
raise NotImplementedError("Signing with NSEC3 not yet implemented")
else:
_rrset_signer = rrset_signer or functools.partial(
default_rrset_signer,
signer=zone.origin,
ksks=ksks,
zsks=zsks,
inception=inception,
expiration=expiration,
lifetime=lifetime,
policy=policy,
origin=zone.origin,
)
return _sign_zone_nsec(zone, _txn, _rrset_signer)
def _sign_zone_nsec(
zone: dns.zone.Zone,
txn: dns.transaction.Transaction,
rrset_signer: Optional[RRsetSigner] = None,
) -> None:
"""NSEC zone signer"""
def _txn_add_nsec(
txn: dns.transaction.Transaction,
name: dns.name.Name,
next_secure: Optional[dns.name.Name],
rdclass: dns.rdataclass.RdataClass,
ttl: int,
rrset_signer: Optional[RRsetSigner] = None,
) -> None:
"""NSEC zone signer helper"""
mandatory_types = set(
[dns.rdatatype.RdataType.RRSIG, dns.rdatatype.RdataType.NSEC]
)
node = txn.get_node(name)
if node and next_secure:
types = (
set([rdataset.rdtype for rdataset in node.rdatasets]) | mandatory_types
)
windows = Bitmap.from_rdtypes(list(types))
rrset = dns.rrset.from_rdata(
name,
ttl,
NSEC(
rdclass=rdclass,
rdtype=dns.rdatatype.RdataType.NSEC,
next=next_secure,
windows=windows,
),
)
txn.add(rrset)
if rrset_signer:
rrset_signer(txn, rrset)
rrsig_ttl = zone.get_soa().minimum
delegation = None
last_secure = None
for name in sorted(txn.iterate_names()):
if delegation and name.is_subdomain(delegation):
# names below delegations are not secure
continue
elif txn.get(name, dns.rdatatype.NS) and name != zone.origin:
# inside delegation
delegation = name
else:
# outside delegation
delegation = None
if rrset_signer:
node = txn.get_node(name)
if node:
for rdataset in node.rdatasets:
if rdataset.rdtype == dns.rdatatype.RRSIG:
# do not sign RRSIGs
continue
elif delegation and rdataset.rdtype != dns.rdatatype.DS:
# do not sign delegations except DS records
continue
else:
rrset = dns.rrset.from_rdata(name, rdataset.ttl, *rdataset)
rrset_signer(txn, rrset)
# We need "is not None" as the empty name is False because its length is 0.
if last_secure is not None:
_txn_add_nsec(txn, last_secure, name, zone.rdclass, rrsig_ttl, rrset_signer)
last_secure = name
if last_secure:
_txn_add_nsec(
txn, last_secure, zone.origin, zone.rdclass, rrsig_ttl, rrset_signer
)
def _need_pyca(*args, **kwargs):
raise ImportError(
"DNSSEC validation requires " + "python cryptography"
"DNSSEC validation requires python cryptography"
) # pragma: no cover
try:
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric import utils
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.hazmat.primitives.asymmetric import ed448
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa # pylint: disable=W0611
from cryptography.hazmat.primitives.asymmetric import ec # pylint: disable=W0611
from cryptography.hazmat.primitives.asymmetric import ed448 # pylint: disable=W0611
from cryptography.hazmat.primitives.asymmetric import rsa # pylint: disable=W0611
from cryptography.hazmat.primitives.asymmetric import ( # pylint: disable=W0611
ed25519,
)
from dns.dnssecalgs import ( # pylint: disable=C0412
get_algorithm_cls,
get_algorithm_cls_from_dnskey,
)
from dns.dnssecalgs.base import GenericPrivateKey, GenericPublicKey
except ImportError: # pragma: no cover
validate = _need_pyca
validate_rrsig = _need_pyca