Bump dnspython from 2.2.1 to 2.3.0 (#1975)

* Bump dnspython from 2.2.1 to 2.3.0

Bumps [dnspython](https://github.com/rthalley/dnspython) from 2.2.1 to 2.3.0.
- [Release notes](https://github.com/rthalley/dnspython/releases)
- [Changelog](https://github.com/rthalley/dnspython/blob/master/doc/whatsnew.rst)
- [Commits](https://github.com/rthalley/dnspython/compare/v2.2.1...v2.3.0)

---
updated-dependencies:
- dependency-name: dnspython
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Update dnspython==2.3.0

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com>

[skip ci]
This commit is contained in:
dependabot[bot] 2023-03-02 20:54:32 -08:00 committed by GitHub
commit 32c06a8b72
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
137 changed files with 7699 additions and 4277 deletions

View file

@ -23,7 +23,7 @@ import dns.rdtypes.util
class Relay(dns.rdtypes.util.Gateway):
name = 'AMTRELAY relay'
name = "AMTRELAY relay"
@property
def relay(self):
@ -37,10 +37,11 @@ class AMTRELAY(dns.rdata.Rdata):
# see: RFC 8777
__slots__ = ['precedence', 'discovery_optional', 'relay_type', 'relay']
__slots__ = ["precedence", "discovery_optional", "relay_type", "relay"]
def __init__(self, rdclass, rdtype, precedence, discovery_optional,
relay_type, relay):
def __init__(
self, rdclass, rdtype, precedence, discovery_optional, relay_type, relay
):
super().__init__(rdclass, rdtype)
relay = Relay(relay_type, relay)
self.precedence = self._as_uint8(precedence)
@ -50,37 +51,42 @@ class AMTRELAY(dns.rdata.Rdata):
def to_text(self, origin=None, relativize=True, **kw):
relay = Relay(self.relay_type, self.relay).to_text(origin, relativize)
return '%d %d %d %s' % (self.precedence, self.discovery_optional,
self.relay_type, relay)
return "%d %d %d %s" % (
self.precedence,
self.discovery_optional,
self.relay_type,
relay,
)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
precedence = tok.get_uint8()
discovery_optional = tok.get_uint8()
if discovery_optional > 1:
raise dns.exception.SyntaxError('expecting 0 or 1')
raise dns.exception.SyntaxError("expecting 0 or 1")
discovery_optional = bool(discovery_optional)
relay_type = tok.get_uint8()
if relay_type > 0x7f:
raise dns.exception.SyntaxError('expecting an integer <= 127')
relay = Relay.from_text(relay_type, tok, origin, relativize,
relativize_to)
return cls(rdclass, rdtype, precedence, discovery_optional, relay_type,
relay.relay)
if relay_type > 0x7F:
raise dns.exception.SyntaxError("expecting an integer <= 127")
relay = Relay.from_text(relay_type, tok, origin, relativize, relativize_to)
return cls(
rdclass, rdtype, precedence, discovery_optional, relay_type, relay.relay
)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
relay_type = self.relay_type | (self.discovery_optional << 7)
header = struct.pack("!BB", self.precedence, relay_type)
file.write(header)
Relay(self.relay_type, self.relay).to_wire(file, compress, origin,
canonicalize)
Relay(self.relay_type, self.relay).to_wire(file, compress, origin, canonicalize)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
(precedence, relay_type) = parser.get_struct('!BB')
(precedence, relay_type) = parser.get_struct("!BB")
discovery_optional = bool(relay_type >> 7)
relay_type &= 0x7f
relay_type &= 0x7F
relay = Relay.from_wire_parser(relay_type, parser, origin)
return cls(rdclass, rdtype, precedence, discovery_optional, relay_type,
relay.relay)
return cls(
rdclass, rdtype, precedence, discovery_optional, relay_type, relay.relay
)

View file

@ -30,7 +30,7 @@ class CAA(dns.rdata.Rdata):
# see: RFC 6844
__slots__ = ['flags', 'tag', 'value']
__slots__ = ["flags", "tag", "value"]
def __init__(self, rdclass, rdtype, flags, tag, value):
super().__init__(rdclass, rdtype)
@ -41,23 +41,26 @@ class CAA(dns.rdata.Rdata):
self.value = self._as_bytes(value)
def to_text(self, origin=None, relativize=True, **kw):
return '%u %s "%s"' % (self.flags,
dns.rdata._escapify(self.tag),
dns.rdata._escapify(self.value))
return '%u %s "%s"' % (
self.flags,
dns.rdata._escapify(self.tag),
dns.rdata._escapify(self.value),
)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
flags = tok.get_uint8()
tag = tok.get_string().encode()
value = tok.get_string().encode()
return cls(rdclass, rdtype, flags, tag, value)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
file.write(struct.pack('!B', self.flags))
file.write(struct.pack("!B", self.flags))
l = len(self.tag)
assert l < 256
file.write(struct.pack('!B', l))
file.write(struct.pack("!B", l))
file.write(self.tag)
file.write(self.value)

View file

@ -15,13 +15,19 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.dnskeybase
import dns.rdtypes.dnskeybase # lgtm[py/import-and-import-from]
import dns.immutable
# pylint: disable=unused-import
from dns.rdtypes.dnskeybase import SEP, REVOKE, ZONE # noqa: F401
from dns.rdtypes.dnskeybase import (
SEP,
REVOKE,
ZONE,
) # noqa: F401 lgtm[py/unused-import]
# pylint: enable=unused-import
@dns.immutable.immutable
class CDNSKEY(dns.rdtypes.dnskeybase.DNSKEYBase):

View file

@ -20,34 +20,34 @@ import base64
import dns.exception
import dns.immutable
import dns.dnssec
import dns.dnssectypes
import dns.rdata
import dns.tokenizer
_ctype_by_value = {
1: 'PKIX',
2: 'SPKI',
3: 'PGP',
4: 'IPKIX',
5: 'ISPKI',
6: 'IPGP',
7: 'ACPKIX',
8: 'IACPKIX',
253: 'URI',
254: 'OID',
1: "PKIX",
2: "SPKI",
3: "PGP",
4: "IPKIX",
5: "ISPKI",
6: "IPGP",
7: "ACPKIX",
8: "IACPKIX",
253: "URI",
254: "OID",
}
_ctype_by_name = {
'PKIX': 1,
'SPKI': 2,
'PGP': 3,
'IPKIX': 4,
'ISPKI': 5,
'IPGP': 6,
'ACPKIX': 7,
'IACPKIX': 8,
'URI': 253,
'OID': 254,
"PKIX": 1,
"SPKI": 2,
"PGP": 3,
"IPKIX": 4,
"ISPKI": 5,
"IPGP": 6,
"ACPKIX": 7,
"IACPKIX": 8,
"URI": 253,
"OID": 254,
}
@ -72,10 +72,11 @@ class CERT(dns.rdata.Rdata):
# see RFC 4398
__slots__ = ['certificate_type', 'key_tag', 'algorithm', 'certificate']
__slots__ = ["certificate_type", "key_tag", "algorithm", "certificate"]
def __init__(self, rdclass, rdtype, certificate_type, key_tag, algorithm,
certificate):
def __init__(
self, rdclass, rdtype, certificate_type, key_tag, algorithm, certificate
):
super().__init__(rdclass, rdtype)
self.certificate_type = self._as_uint16(certificate_type)
self.key_tag = self._as_uint16(key_tag)
@ -84,24 +85,28 @@ class CERT(dns.rdata.Rdata):
def to_text(self, origin=None, relativize=True, **kw):
certificate_type = _ctype_to_text(self.certificate_type)
return "%s %d %s %s" % (certificate_type, self.key_tag,
dns.dnssec.algorithm_to_text(self.algorithm),
dns.rdata._base64ify(self.certificate, **kw))
return "%s %d %s %s" % (
certificate_type,
self.key_tag,
dns.dnssectypes.Algorithm.to_text(self.algorithm),
dns.rdata._base64ify(self.certificate, **kw),
)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
certificate_type = _ctype_from_text(tok.get_string())
key_tag = tok.get_uint16()
algorithm = dns.dnssec.algorithm_from_text(tok.get_string())
algorithm = dns.dnssectypes.Algorithm.from_text(tok.get_string())
b64 = tok.concatenate_remaining_identifiers().encode()
certificate = base64.b64decode(b64)
return cls(rdclass, rdtype, certificate_type, key_tag,
algorithm, certificate)
return cls(rdclass, rdtype, certificate_type, key_tag, algorithm, certificate)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
prefix = struct.pack("!HHB", self.certificate_type, self.key_tag,
self.algorithm)
prefix = struct.pack(
"!HHB", self.certificate_type, self.key_tag, self.algorithm
)
file.write(prefix)
file.write(self.certificate)
@ -109,5 +114,4 @@ class CERT(dns.rdata.Rdata):
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
(certificate_type, key_tag, algorithm) = parser.get_struct("!HHB")
certificate = parser.get_remaining()
return cls(rdclass, rdtype, certificate_type, key_tag, algorithm,
certificate)
return cls(rdclass, rdtype, certificate_type, key_tag, algorithm, certificate)

View file

@ -27,7 +27,7 @@ import dns.rdtypes.util
@dns.immutable.immutable
class Bitmap(dns.rdtypes.util.Bitmap):
type_name = 'CSYNC'
type_name = "CSYNC"
@dns.immutable.immutable
@ -35,7 +35,7 @@ class CSYNC(dns.rdata.Rdata):
"""CSYNC record"""
__slots__ = ['serial', 'flags', 'windows']
__slots__ = ["serial", "flags", "windows"]
def __init__(self, rdclass, rdtype, serial, flags, windows):
super().__init__(rdclass, rdtype)
@ -47,18 +47,19 @@ class CSYNC(dns.rdata.Rdata):
def to_text(self, origin=None, relativize=True, **kw):
text = Bitmap(self.windows).to_text()
return '%d %d%s' % (self.serial, self.flags, text)
return "%d %d%s" % (self.serial, self.flags, text)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
serial = tok.get_uint32()
flags = tok.get_uint16()
bitmap = Bitmap.from_text(tok)
return cls(rdclass, rdtype, serial, flags, bitmap)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
file.write(struct.pack('!IH', self.serial, self.flags))
file.write(struct.pack("!IH", self.serial, self.flags))
Bitmap(self.windows).to_wire(file)
@classmethod

View file

@ -15,13 +15,19 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.dnskeybase
import dns.rdtypes.dnskeybase # lgtm[py/import-and-import-from]
import dns.immutable
# pylint: disable=unused-import
from dns.rdtypes.dnskeybase import SEP, REVOKE, ZONE # noqa: F401
from dns.rdtypes.dnskeybase import (
SEP,
REVOKE,
ZONE,
) # noqa: F401 lgtm[py/unused-import]
# pylint: enable=unused-import
@dns.immutable.immutable
class DNSKEY(dns.rdtypes.dnskeybase.DNSKEYBase):

View file

@ -26,19 +26,19 @@ import dns.tokenizer
def _validate_float_string(what):
if len(what) == 0:
raise dns.exception.FormError
if what[0] == b'-'[0] or what[0] == b'+'[0]:
if what[0] == b"-"[0] or what[0] == b"+"[0]:
what = what[1:]
if what.isdigit():
return
try:
(left, right) = what.split(b'.')
(left, right) = what.split(b".")
except ValueError:
raise dns.exception.FormError
if left == b'' and right == b'':
if left == b"" and right == b"":
raise dns.exception.FormError
if not left == b'' and not left.decode().isdigit():
if not left == b"" and not left.decode().isdigit():
raise dns.exception.FormError
if not right == b'' and not right.decode().isdigit():
if not right == b"" and not right.decode().isdigit():
raise dns.exception.FormError
@ -49,18 +49,15 @@ class GPOS(dns.rdata.Rdata):
# see: RFC 1712
__slots__ = ['latitude', 'longitude', 'altitude']
__slots__ = ["latitude", "longitude", "altitude"]
def __init__(self, rdclass, rdtype, latitude, longitude, altitude):
super().__init__(rdclass, rdtype)
if isinstance(latitude, float) or \
isinstance(latitude, int):
if isinstance(latitude, float) or isinstance(latitude, int):
latitude = str(latitude)
if isinstance(longitude, float) or \
isinstance(longitude, int):
if isinstance(longitude, float) or isinstance(longitude, int):
longitude = str(longitude)
if isinstance(altitude, float) or \
isinstance(altitude, int):
if isinstance(altitude, float) or isinstance(altitude, int):
altitude = str(altitude)
latitude = self._as_bytes(latitude, True, 255)
longitude = self._as_bytes(longitude, True, 255)
@ -73,19 +70,20 @@ class GPOS(dns.rdata.Rdata):
self.altitude = altitude
flat = self.float_latitude
if flat < -90.0 or flat > 90.0:
raise dns.exception.FormError('bad latitude')
raise dns.exception.FormError("bad latitude")
flong = self.float_longitude
if flong < -180.0 or flong > 180.0:
raise dns.exception.FormError('bad longitude')
raise dns.exception.FormError("bad longitude")
def to_text(self, origin=None, relativize=True, **kw):
return '{} {} {}'.format(self.latitude.decode(),
self.longitude.decode(),
self.altitude.decode())
return "{} {} {}".format(
self.latitude.decode(), self.longitude.decode(), self.altitude.decode()
)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
latitude = tok.get_string()
longitude = tok.get_string()
altitude = tok.get_string()
@ -94,15 +92,15 @@ class GPOS(dns.rdata.Rdata):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
l = len(self.latitude)
assert l < 256
file.write(struct.pack('!B', l))
file.write(struct.pack("!B", l))
file.write(self.latitude)
l = len(self.longitude)
assert l < 256
file.write(struct.pack('!B', l))
file.write(struct.pack("!B", l))
file.write(self.longitude)
l = len(self.altitude)
assert l < 256
file.write(struct.pack('!B', l))
file.write(struct.pack("!B", l))
file.write(self.altitude)
@classmethod

View file

@ -30,7 +30,7 @@ class HINFO(dns.rdata.Rdata):
# see: RFC 1035
__slots__ = ['cpu', 'os']
__slots__ = ["cpu", "os"]
def __init__(self, rdclass, rdtype, cpu, os):
super().__init__(rdclass, rdtype)
@ -38,12 +38,14 @@ class HINFO(dns.rdata.Rdata):
self.os = self._as_bytes(os, True, 255)
def to_text(self, origin=None, relativize=True, **kw):
return '"{}" "{}"'.format(dns.rdata._escapify(self.cpu),
dns.rdata._escapify(self.os))
return '"{}" "{}"'.format(
dns.rdata._escapify(self.cpu), dns.rdata._escapify(self.os)
)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
cpu = tok.get_string(max_length=255)
os = tok.get_string(max_length=255)
return cls(rdclass, rdtype, cpu, os)
@ -51,11 +53,11 @@ class HINFO(dns.rdata.Rdata):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
l = len(self.cpu)
assert l < 256
file.write(struct.pack('!B', l))
file.write(struct.pack("!B", l))
file.write(self.cpu)
l = len(self.os)
assert l < 256
file.write(struct.pack('!B', l))
file.write(struct.pack("!B", l))
file.write(self.os)
@classmethod

View file

@ -32,7 +32,7 @@ class HIP(dns.rdata.Rdata):
# see: RFC 5205
__slots__ = ['hit', 'algorithm', 'key', 'servers']
__slots__ = ["hit", "algorithm", "key", "servers"]
def __init__(self, rdclass, rdtype, hit, algorithm, key, servers):
super().__init__(rdclass, rdtype)
@ -43,18 +43,19 @@ class HIP(dns.rdata.Rdata):
def to_text(self, origin=None, relativize=True, **kw):
hit = binascii.hexlify(self.hit).decode()
key = base64.b64encode(self.key).replace(b'\n', b'').decode()
text = ''
key = base64.b64encode(self.key).replace(b"\n", b"").decode()
text = ""
servers = []
for server in self.servers:
servers.append(server.choose_relativity(origin, relativize))
if len(servers) > 0:
text += (' ' + ' '.join((x.to_unicode() for x in servers)))
return '%u %s %s%s' % (self.algorithm, hit, key, text)
text += " " + " ".join((x.to_unicode() for x in servers))
return "%u %s %s%s" % (self.algorithm, hit, key, text)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
algorithm = tok.get_uint8()
hit = binascii.unhexlify(tok.get_string().encode())
key = base64.b64decode(tok.get_string().encode())
@ -75,7 +76,7 @@ class HIP(dns.rdata.Rdata):
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
(lh, algorithm, lk) = parser.get_struct('!BBH')
(lh, algorithm, lk) = parser.get_struct("!BBH")
hit = parser.get_bytes(lh)
key = parser.get_bytes(lk)
servers = []

View file

@ -30,7 +30,7 @@ class ISDN(dns.rdata.Rdata):
# see: RFC 1183
__slots__ = ['address', 'subaddress']
__slots__ = ["address", "subaddress"]
def __init__(self, rdclass, rdtype, address, subaddress):
super().__init__(rdclass, rdtype)
@ -39,31 +39,33 @@ class ISDN(dns.rdata.Rdata):
def to_text(self, origin=None, relativize=True, **kw):
if self.subaddress:
return '"{}" "{}"'.format(dns.rdata._escapify(self.address),
dns.rdata._escapify(self.subaddress))
return '"{}" "{}"'.format(
dns.rdata._escapify(self.address), dns.rdata._escapify(self.subaddress)
)
else:
return '"%s"' % dns.rdata._escapify(self.address)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
address = tok.get_string()
tokens = tok.get_remaining(max_tokens=1)
if len(tokens) >= 1:
subaddress = tokens[0].unescape().value
else:
subaddress = ''
subaddress = ""
return cls(rdclass, rdtype, address, subaddress)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
l = len(self.address)
assert l < 256
file.write(struct.pack('!B', l))
file.write(struct.pack("!B", l))
file.write(self.address)
l = len(self.subaddress)
if l > 0:
assert l < 256
file.write(struct.pack('!B', l))
file.write(struct.pack("!B", l))
file.write(self.subaddress)
@classmethod
@ -72,5 +74,5 @@ class ISDN(dns.rdata.Rdata):
if parser.remaining() > 0:
subaddress = parser.get_counted_bytes()
else:
subaddress = b''
subaddress = b""
return cls(rdclass, rdtype, address, subaddress)

View file

@ -3,6 +3,7 @@
import struct
import dns.immutable
import dns.rdata
@dns.immutable.immutable
@ -12,7 +13,7 @@ class L32(dns.rdata.Rdata):
# see: rfc6742.txt
__slots__ = ['preference', 'locator32']
__slots__ = ["preference", "locator32"]
def __init__(self, rdclass, rdtype, preference, locator32):
super().__init__(rdclass, rdtype)
@ -20,17 +21,18 @@ class L32(dns.rdata.Rdata):
self.locator32 = self._as_ipv4_address(locator32)
def to_text(self, origin=None, relativize=True, **kw):
return f'{self.preference} {self.locator32}'
return f"{self.preference} {self.locator32}"
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
preference = tok.get_uint16()
nodeid = tok.get_identifier()
return cls(rdclass, rdtype, preference, nodeid)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
file.write(struct.pack('!H', self.preference))
file.write(struct.pack("!H", self.preference))
file.write(dns.ipv4.inet_aton(self.locator32))
@classmethod

View file

@ -13,33 +13,33 @@ class L64(dns.rdata.Rdata):
# see: rfc6742.txt
__slots__ = ['preference', 'locator64']
__slots__ = ["preference", "locator64"]
def __init__(self, rdclass, rdtype, preference, locator64):
super().__init__(rdclass, rdtype)
self.preference = self._as_uint16(preference)
if isinstance(locator64, bytes):
if len(locator64) != 8:
raise ValueError('invalid locator64')
self.locator64 = dns.rdata._hexify(locator64, 4, b':')
raise ValueError("invalid locator64")
self.locator64 = dns.rdata._hexify(locator64, 4, b":")
else:
dns.rdtypes.util.parse_formatted_hex(locator64, 4, 4, ':')
dns.rdtypes.util.parse_formatted_hex(locator64, 4, 4, ":")
self.locator64 = locator64
def to_text(self, origin=None, relativize=True, **kw):
return f'{self.preference} {self.locator64}'
return f"{self.preference} {self.locator64}"
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
preference = tok.get_uint16()
locator64 = tok.get_identifier()
return cls(rdclass, rdtype, preference, locator64)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
file.write(struct.pack('!H', self.preference))
file.write(dns.rdtypes.util.parse_formatted_hex(self.locator64,
4, 4, ':'))
file.write(struct.pack("!H", self.preference))
file.write(dns.rdtypes.util.parse_formatted_hex(self.locator64, 4, 4, ":"))
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):

View file

@ -93,15 +93,15 @@ def _decode_size(what, desc):
def _check_coordinate_list(value, low, high):
if value[0] < low or value[0] > high:
raise ValueError(f'not in range [{low}, {high}]')
raise ValueError(f"not in range [{low}, {high}]")
if value[1] < 0 or value[1] > 59:
raise ValueError('bad minutes value')
raise ValueError("bad minutes value")
if value[2] < 0 or value[2] > 59:
raise ValueError('bad seconds value')
raise ValueError("bad seconds value")
if value[3] < 0 or value[3] > 999:
raise ValueError('bad milliseconds value')
raise ValueError("bad milliseconds value")
if value[4] != 1 and value[4] != -1:
raise ValueError('bad hemisphere value')
raise ValueError("bad hemisphere value")
@dns.immutable.immutable
@ -111,12 +111,26 @@ class LOC(dns.rdata.Rdata):
# see: RFC 1876
__slots__ = ['latitude', 'longitude', 'altitude', 'size',
'horizontal_precision', 'vertical_precision']
__slots__ = [
"latitude",
"longitude",
"altitude",
"size",
"horizontal_precision",
"vertical_precision",
]
def __init__(self, rdclass, rdtype, latitude, longitude, altitude,
size=_default_size, hprec=_default_hprec,
vprec=_default_vprec):
def __init__(
self,
rdclass,
rdtype,
latitude,
longitude,
altitude,
size=_default_size,
hprec=_default_hprec,
vprec=_default_vprec,
):
"""Initialize a LOC record instance.
The parameters I{latitude} and I{longitude} may be either a 4-tuple
@ -145,34 +159,44 @@ class LOC(dns.rdata.Rdata):
def to_text(self, origin=None, relativize=True, **kw):
if self.latitude[4] > 0:
lat_hemisphere = 'N'
lat_hemisphere = "N"
else:
lat_hemisphere = 'S'
lat_hemisphere = "S"
if self.longitude[4] > 0:
long_hemisphere = 'E'
long_hemisphere = "E"
else:
long_hemisphere = 'W'
long_hemisphere = "W"
text = "%d %d %d.%03d %s %d %d %d.%03d %s %0.2fm" % (
self.latitude[0], self.latitude[1],
self.latitude[2], self.latitude[3], lat_hemisphere,
self.longitude[0], self.longitude[1], self.longitude[2],
self.longitude[3], long_hemisphere,
self.altitude / 100.0
self.latitude[0],
self.latitude[1],
self.latitude[2],
self.latitude[3],
lat_hemisphere,
self.longitude[0],
self.longitude[1],
self.longitude[2],
self.longitude[3],
long_hemisphere,
self.altitude / 100.0,
)
# do not print default values
if self.size != _default_size or \
self.horizontal_precision != _default_hprec or \
self.vertical_precision != _default_vprec:
if (
self.size != _default_size
or self.horizontal_precision != _default_hprec
or self.vertical_precision != _default_vprec
):
text += " {:0.2f}m {:0.2f}m {:0.2f}m".format(
self.size / 100.0, self.horizontal_precision / 100.0,
self.vertical_precision / 100.0
self.size / 100.0,
self.horizontal_precision / 100.0,
self.vertical_precision / 100.0,
)
return text
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
latitude = [0, 0, 0, 0, 1]
longitude = [0, 0, 0, 0, 1]
size = _default_size
@ -184,16 +208,14 @@ class LOC(dns.rdata.Rdata):
if t.isdigit():
latitude[1] = int(t)
t = tok.get_string()
if '.' in t:
(seconds, milliseconds) = t.split('.')
if "." in t:
(seconds, milliseconds) = t.split(".")
if not seconds.isdigit():
raise dns.exception.SyntaxError(
'bad latitude seconds value')
raise dns.exception.SyntaxError("bad latitude seconds value")
latitude[2] = int(seconds)
l = len(milliseconds)
if l == 0 or l > 3 or not milliseconds.isdigit():
raise dns.exception.SyntaxError(
'bad latitude milliseconds value')
raise dns.exception.SyntaxError("bad latitude milliseconds value")
if l == 1:
m = 100
elif l == 2:
@ -205,26 +227,24 @@ class LOC(dns.rdata.Rdata):
elif t.isdigit():
latitude[2] = int(t)
t = tok.get_string()
if t == 'S':
if t == "S":
latitude[4] = -1
elif t != 'N':
raise dns.exception.SyntaxError('bad latitude hemisphere value')
elif t != "N":
raise dns.exception.SyntaxError("bad latitude hemisphere value")
longitude[0] = tok.get_int()
t = tok.get_string()
if t.isdigit():
longitude[1] = int(t)
t = tok.get_string()
if '.' in t:
(seconds, milliseconds) = t.split('.')
if "." in t:
(seconds, milliseconds) = t.split(".")
if not seconds.isdigit():
raise dns.exception.SyntaxError(
'bad longitude seconds value')
raise dns.exception.SyntaxError("bad longitude seconds value")
longitude[2] = int(seconds)
l = len(milliseconds)
if l == 0 or l > 3 or not milliseconds.isdigit():
raise dns.exception.SyntaxError(
'bad longitude milliseconds value')
raise dns.exception.SyntaxError("bad longitude milliseconds value")
if l == 1:
m = 100
elif l == 2:
@ -236,64 +256,75 @@ class LOC(dns.rdata.Rdata):
elif t.isdigit():
longitude[2] = int(t)
t = tok.get_string()
if t == 'W':
if t == "W":
longitude[4] = -1
elif t != 'E':
raise dns.exception.SyntaxError('bad longitude hemisphere value')
elif t != "E":
raise dns.exception.SyntaxError("bad longitude hemisphere value")
t = tok.get_string()
if t[-1] == 'm':
t = t[0: -1]
altitude = float(t) * 100.0 # m -> cm
if t[-1] == "m":
t = t[0:-1]
altitude = float(t) * 100.0 # m -> cm
tokens = tok.get_remaining(max_tokens=3)
if len(tokens) >= 1:
value = tokens[0].unescape().value
if value[-1] == 'm':
value = value[0: -1]
size = float(value) * 100.0 # m -> cm
if value[-1] == "m":
value = value[0:-1]
size = float(value) * 100.0 # m -> cm
if len(tokens) >= 2:
value = tokens[1].unescape().value
if value[-1] == 'm':
value = value[0: -1]
hprec = float(value) * 100.0 # m -> cm
if value[-1] == "m":
value = value[0:-1]
hprec = float(value) * 100.0 # m -> cm
if len(tokens) >= 3:
value = tokens[2].unescape().value
if value[-1] == 'm':
value = value[0: -1]
vprec = float(value) * 100.0 # m -> cm
if value[-1] == "m":
value = value[0:-1]
vprec = float(value) * 100.0 # m -> cm
# Try encoding these now so we raise if they are bad
_encode_size(size, "size")
_encode_size(hprec, "horizontal precision")
_encode_size(vprec, "vertical precision")
return cls(rdclass, rdtype, latitude, longitude, altitude,
size, hprec, vprec)
return cls(rdclass, rdtype, latitude, longitude, altitude, size, hprec, vprec)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
milliseconds = (self.latitude[0] * 3600000 +
self.latitude[1] * 60000 +
self.latitude[2] * 1000 +
self.latitude[3]) * self.latitude[4]
milliseconds = (
self.latitude[0] * 3600000
+ self.latitude[1] * 60000
+ self.latitude[2] * 1000
+ self.latitude[3]
) * self.latitude[4]
latitude = 0x80000000 + milliseconds
milliseconds = (self.longitude[0] * 3600000 +
self.longitude[1] * 60000 +
self.longitude[2] * 1000 +
self.longitude[3]) * self.longitude[4]
milliseconds = (
self.longitude[0] * 3600000
+ self.longitude[1] * 60000
+ self.longitude[2] * 1000
+ self.longitude[3]
) * self.longitude[4]
longitude = 0x80000000 + milliseconds
altitude = int(self.altitude) + 10000000
size = _encode_size(self.size, "size")
hprec = _encode_size(self.horizontal_precision, "horizontal precision")
vprec = _encode_size(self.vertical_precision, "vertical precision")
wire = struct.pack("!BBBBIII", 0, size, hprec, vprec, latitude,
longitude, altitude)
wire = struct.pack(
"!BBBBIII", 0, size, hprec, vprec, latitude, longitude, altitude
)
file.write(wire)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
(version, size, hprec, vprec, latitude, longitude, altitude) = \
parser.get_struct("!BBBBIII")
(
version,
size,
hprec,
vprec,
latitude,
longitude,
altitude,
) = parser.get_struct("!BBBBIII")
if version != 0:
raise dns.exception.FormError("LOC version not zero")
if latitude < _MIN_LATITUDE or latitude > _MAX_LATITUDE:
@ -312,8 +343,7 @@ class LOC(dns.rdata.Rdata):
size = _decode_size(size, "size")
hprec = _decode_size(hprec, "horizontal precision")
vprec = _decode_size(vprec, "vertical precision")
return cls(rdclass, rdtype, latitude, longitude, altitude,
size, hprec, vprec)
return cls(rdclass, rdtype, latitude, longitude, altitude, size, hprec, vprec)
@property
def float_latitude(self):

View file

@ -3,6 +3,7 @@
import struct
import dns.immutable
import dns.rdata
@dns.immutable.immutable
@ -12,7 +13,7 @@ class LP(dns.rdata.Rdata):
# see: rfc6742.txt
__slots__ = ['preference', 'fqdn']
__slots__ = ["preference", "fqdn"]
def __init__(self, rdclass, rdtype, preference, fqdn):
super().__init__(rdclass, rdtype)
@ -21,17 +22,18 @@ class LP(dns.rdata.Rdata):
def to_text(self, origin=None, relativize=True, **kw):
fqdn = self.fqdn.choose_relativity(origin, relativize)
return '%d %s' % (self.preference, fqdn)
return "%d %s" % (self.preference, fqdn)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
preference = tok.get_uint16()
fqdn = tok.get_name(origin, relativize, relativize_to)
return cls(rdclass, rdtype, preference, fqdn)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
file.write(struct.pack('!H', self.preference))
file.write(struct.pack("!H", self.preference))
self.fqdn.to_wire(file, compress, origin, canonicalize)
@classmethod

View file

@ -13,32 +13,33 @@ class NID(dns.rdata.Rdata):
# see: rfc6742.txt
__slots__ = ['preference', 'nodeid']
__slots__ = ["preference", "nodeid"]
def __init__(self, rdclass, rdtype, preference, nodeid):
super().__init__(rdclass, rdtype)
self.preference = self._as_uint16(preference)
if isinstance(nodeid, bytes):
if len(nodeid) != 8:
raise ValueError('invalid nodeid')
self.nodeid = dns.rdata._hexify(nodeid, 4, b':')
raise ValueError("invalid nodeid")
self.nodeid = dns.rdata._hexify(nodeid, 4, b":")
else:
dns.rdtypes.util.parse_formatted_hex(nodeid, 4, 4, ':')
dns.rdtypes.util.parse_formatted_hex(nodeid, 4, 4, ":")
self.nodeid = nodeid
def to_text(self, origin=None, relativize=True, **kw):
return f'{self.preference} {self.nodeid}'
return f"{self.preference} {self.nodeid}"
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
preference = tok.get_uint16()
nodeid = tok.get_identifier()
return cls(rdclass, rdtype, preference, nodeid)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
file.write(struct.pack('!H', self.preference))
file.write(dns.rdtypes.util.parse_formatted_hex(self.nodeid, 4, 4, ':'))
file.write(struct.pack("!H", self.preference))
file.write(dns.rdtypes.util.parse_formatted_hex(self.nodeid, 4, 4, ":"))
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):

View file

@ -25,7 +25,7 @@ import dns.rdtypes.util
@dns.immutable.immutable
class Bitmap(dns.rdtypes.util.Bitmap):
type_name = 'NSEC'
type_name = "NSEC"
@dns.immutable.immutable
@ -33,7 +33,7 @@ class NSEC(dns.rdata.Rdata):
"""NSEC record"""
__slots__ = ['next', 'windows']
__slots__ = ["next", "windows"]
def __init__(self, rdclass, rdtype, next, windows):
super().__init__(rdclass, rdtype)
@ -45,11 +45,12 @@ class NSEC(dns.rdata.Rdata):
def to_text(self, origin=None, relativize=True, **kw):
next = self.next.choose_relativity(origin, relativize)
text = Bitmap(self.windows).to_text()
return '{}{}'.format(next, text)
return "{}{}".format(next, text)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
next = tok.get_name(origin, relativize, relativize_to)
windows = Bitmap.from_text(tok)
return cls(rdclass, rdtype, next, windows)

View file

@ -26,10 +26,12 @@ import dns.rdatatype
import dns.rdtypes.util
b32_hex_to_normal = bytes.maketrans(b'0123456789ABCDEFGHIJKLMNOPQRSTUV',
b'ABCDEFGHIJKLMNOPQRSTUVWXYZ234567')
b32_normal_to_hex = bytes.maketrans(b'ABCDEFGHIJKLMNOPQRSTUVWXYZ234567',
b'0123456789ABCDEFGHIJKLMNOPQRSTUV')
b32_hex_to_normal = bytes.maketrans(
b"0123456789ABCDEFGHIJKLMNOPQRSTUV", b"ABCDEFGHIJKLMNOPQRSTUVWXYZ234567"
)
b32_normal_to_hex = bytes.maketrans(
b"ABCDEFGHIJKLMNOPQRSTUVWXYZ234567", b"0123456789ABCDEFGHIJKLMNOPQRSTUV"
)
# hash algorithm constants
SHA1 = 1
@ -40,7 +42,7 @@ OPTOUT = 1
@dns.immutable.immutable
class Bitmap(dns.rdtypes.util.Bitmap):
type_name = 'NSEC3'
type_name = "NSEC3"
@dns.immutable.immutable
@ -48,10 +50,11 @@ class NSEC3(dns.rdata.Rdata):
"""NSEC3 record"""
__slots__ = ['algorithm', 'flags', 'iterations', 'salt', 'next', 'windows']
__slots__ = ["algorithm", "flags", "iterations", "salt", "next", "windows"]
def __init__(self, rdclass, rdtype, algorithm, flags, iterations, salt,
next, windows):
def __init__(
self, rdclass, rdtype, algorithm, flags, iterations, salt, next, windows
):
super().__init__(rdclass, rdtype)
self.algorithm = self._as_uint8(algorithm)
self.flags = self._as_uint8(flags)
@ -63,38 +66,41 @@ class NSEC3(dns.rdata.Rdata):
self.windows = tuple(windows.windows)
def to_text(self, origin=None, relativize=True, **kw):
next = base64.b32encode(self.next).translate(
b32_normal_to_hex).lower().decode()
if self.salt == b'':
salt = '-'
next = base64.b32encode(self.next).translate(b32_normal_to_hex).lower().decode()
if self.salt == b"":
salt = "-"
else:
salt = binascii.hexlify(self.salt).decode()
text = Bitmap(self.windows).to_text()
return '%u %u %u %s %s%s' % (self.algorithm, self.flags,
self.iterations, salt, next, text)
return "%u %u %u %s %s%s" % (
self.algorithm,
self.flags,
self.iterations,
salt,
next,
text,
)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
algorithm = tok.get_uint8()
flags = tok.get_uint8()
iterations = tok.get_uint16()
salt = tok.get_string()
if salt == '-':
salt = b''
if salt == "-":
salt = b""
else:
salt = binascii.unhexlify(salt.encode('ascii'))
next = tok.get_string().encode(
'ascii').upper().translate(b32_hex_to_normal)
salt = binascii.unhexlify(salt.encode("ascii"))
next = tok.get_string().encode("ascii").upper().translate(b32_hex_to_normal)
next = base64.b32decode(next)
bitmap = Bitmap.from_text(tok)
return cls(rdclass, rdtype, algorithm, flags, iterations, salt, next,
bitmap)
return cls(rdclass, rdtype, algorithm, flags, iterations, salt, next, bitmap)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
l = len(self.salt)
file.write(struct.pack("!BBHB", self.algorithm, self.flags,
self.iterations, l))
file.write(struct.pack("!BBHB", self.algorithm, self.flags, self.iterations, l))
file.write(self.salt)
l = len(self.next)
file.write(struct.pack("!B", l))
@ -103,9 +109,8 @@ class NSEC3(dns.rdata.Rdata):
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
(algorithm, flags, iterations) = parser.get_struct('!BBH')
(algorithm, flags, iterations) = parser.get_struct("!BBH")
salt = parser.get_counted_bytes()
next = parser.get_counted_bytes()
bitmap = Bitmap.from_wire_parser(parser)
return cls(rdclass, rdtype, algorithm, flags, iterations, salt, next,
bitmap)
return cls(rdclass, rdtype, algorithm, flags, iterations, salt, next, bitmap)

View file

@ -28,7 +28,7 @@ class NSEC3PARAM(dns.rdata.Rdata):
"""NSEC3PARAM record"""
__slots__ = ['algorithm', 'flags', 'iterations', 'salt']
__slots__ = ["algorithm", "flags", "iterations", "salt"]
def __init__(self, rdclass, rdtype, algorithm, flags, iterations, salt):
super().__init__(rdclass, rdtype)
@ -38,34 +38,33 @@ class NSEC3PARAM(dns.rdata.Rdata):
self.salt = self._as_bytes(salt, True, 255)
def to_text(self, origin=None, relativize=True, **kw):
if self.salt == b'':
salt = '-'
if self.salt == b"":
salt = "-"
else:
salt = binascii.hexlify(self.salt).decode()
return '%u %u %u %s' % (self.algorithm, self.flags, self.iterations,
salt)
return "%u %u %u %s" % (self.algorithm, self.flags, self.iterations, salt)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
algorithm = tok.get_uint8()
flags = tok.get_uint8()
iterations = tok.get_uint16()
salt = tok.get_string()
if salt == '-':
salt = ''
if salt == "-":
salt = ""
else:
salt = binascii.unhexlify(salt.encode())
return cls(rdclass, rdtype, algorithm, flags, iterations, salt)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
l = len(self.salt)
file.write(struct.pack("!BBHB", self.algorithm, self.flags,
self.iterations, l))
file.write(struct.pack("!BBHB", self.algorithm, self.flags, self.iterations, l))
file.write(self.salt)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
(algorithm, flags, iterations) = parser.get_struct('!BBH')
(algorithm, flags, iterations) = parser.get_struct("!BBH")
salt = parser.get_counted_bytes()
return cls(rdclass, rdtype, algorithm, flags, iterations, salt)

View file

@ -22,6 +22,7 @@ import dns.immutable
import dns.rdata
import dns.tokenizer
@dns.immutable.immutable
class OPENPGPKEY(dns.rdata.Rdata):
@ -37,8 +38,9 @@ class OPENPGPKEY(dns.rdata.Rdata):
return dns.rdata._base64ify(self.key, chunksize=None, **kw)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
b64 = tok.concatenate_remaining_identifiers().encode()
key = base64.b64decode(b64)
return cls(rdclass, rdtype, key)

View file

@ -26,12 +26,13 @@ import dns.rdata
# We don't implement from_text, and that's ok.
# pylint: disable=abstract-method
@dns.immutable.immutable
class OPT(dns.rdata.Rdata):
"""OPT record"""
__slots__ = ['options']
__slots__ = ["options"]
def __init__(self, rdclass, rdtype, options):
"""Initialize an OPT rdata.
@ -45,10 +46,12 @@ class OPT(dns.rdata.Rdata):
"""
super().__init__(rdclass, rdtype)
def as_option(option):
if not isinstance(option, dns.edns.Option):
raise ValueError('option is not a dns.edns.option')
raise ValueError("option is not a dns.edns.option")
return option
self.options = self._as_tuple(options, as_option)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
@ -58,13 +61,13 @@ class OPT(dns.rdata.Rdata):
file.write(owire)
def to_text(self, origin=None, relativize=True, **kw):
return ' '.join(opt.to_text() for opt in self.options)
return " ".join(opt.to_text() for opt in self.options)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
options = []
while parser.remaining() > 0:
(otype, olen) = parser.get_struct('!HH')
(otype, olen) = parser.get_struct("!HH")
with parser.restrict_to(olen):
opt = dns.edns.option_from_wire_parser(otype, parser)
options.append(opt)

View file

@ -28,7 +28,7 @@ class RP(dns.rdata.Rdata):
# see: RFC 1183
__slots__ = ['mbox', 'txt']
__slots__ = ["mbox", "txt"]
def __init__(self, rdclass, rdtype, mbox, txt):
super().__init__(rdclass, rdtype)
@ -41,8 +41,9 @@ class RP(dns.rdata.Rdata):
return "{} {}".format(str(mbox), str(txt))
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
mbox = tok.get_name(origin, relativize, relativize_to)
txt = tok.get_name(origin, relativize, relativize_to)
return cls(rdclass, rdtype, mbox, txt)

View file

@ -20,7 +20,7 @@ import calendar
import struct
import time
import dns.dnssec
import dns.dnssectypes
import dns.immutable
import dns.exception
import dns.rdata
@ -43,12 +43,11 @@ def sigtime_to_posixtime(what):
hour = int(what[8:10])
minute = int(what[10:12])
second = int(what[12:14])
return calendar.timegm((year, month, day, hour, minute, second,
0, 0, 0))
return calendar.timegm((year, month, day, hour, minute, second, 0, 0, 0))
def posixtime_to_sigtime(what):
return time.strftime('%Y%m%d%H%M%S', time.gmtime(what))
return time.strftime("%Y%m%d%H%M%S", time.gmtime(what))
@dns.immutable.immutable
@ -56,16 +55,35 @@ class RRSIG(dns.rdata.Rdata):
"""RRSIG record"""
__slots__ = ['type_covered', 'algorithm', 'labels', 'original_ttl',
'expiration', 'inception', 'key_tag', 'signer',
'signature']
__slots__ = [
"type_covered",
"algorithm",
"labels",
"original_ttl",
"expiration",
"inception",
"key_tag",
"signer",
"signature",
]
def __init__(self, rdclass, rdtype, type_covered, algorithm, labels,
original_ttl, expiration, inception, key_tag, signer,
signature):
def __init__(
self,
rdclass,
rdtype,
type_covered,
algorithm,
labels,
original_ttl,
expiration,
inception,
key_tag,
signer,
signature,
):
super().__init__(rdclass, rdtype)
self.type_covered = self._as_rdatatype(type_covered)
self.algorithm = dns.dnssec.Algorithm.make(algorithm)
self.algorithm = dns.dnssectypes.Algorithm.make(algorithm)
self.labels = self._as_uint8(labels)
self.original_ttl = self._as_ttl(original_ttl)
self.expiration = self._as_uint32(expiration)
@ -78,7 +96,7 @@ class RRSIG(dns.rdata.Rdata):
return self.type_covered
def to_text(self, origin=None, relativize=True, **kw):
return '%s %d %d %d %s %s %d %s %s' % (
return "%s %d %d %d %s %s %d %s %s" % (
dns.rdatatype.to_text(self.type_covered),
self.algorithm,
self.labels,
@ -87,14 +105,15 @@ class RRSIG(dns.rdata.Rdata):
posixtime_to_sigtime(self.inception),
self.key_tag,
self.signer.choose_relativity(origin, relativize),
dns.rdata._base64ify(self.signature, **kw)
dns.rdata._base64ify(self.signature, **kw),
)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
type_covered = dns.rdatatype.from_text(tok.get_string())
algorithm = dns.dnssec.algorithm_from_text(tok.get_string())
algorithm = dns.dnssectypes.Algorithm.from_text(tok.get_string())
labels = tok.get_int()
original_ttl = tok.get_ttl()
expiration = sigtime_to_posixtime(tok.get_string())
@ -103,22 +122,38 @@ class RRSIG(dns.rdata.Rdata):
signer = tok.get_name(origin, relativize, relativize_to)
b64 = tok.concatenate_remaining_identifiers().encode()
signature = base64.b64decode(b64)
return cls(rdclass, rdtype, type_covered, algorithm, labels,
original_ttl, expiration, inception, key_tag, signer,
signature)
return cls(
rdclass,
rdtype,
type_covered,
algorithm,
labels,
original_ttl,
expiration,
inception,
key_tag,
signer,
signature,
)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
header = struct.pack('!HBBIIIH', self.type_covered,
self.algorithm, self.labels,
self.original_ttl, self.expiration,
self.inception, self.key_tag)
header = struct.pack(
"!HBBIIIH",
self.type_covered,
self.algorithm,
self.labels,
self.original_ttl,
self.expiration,
self.inception,
self.key_tag,
)
file.write(header)
self.signer.to_wire(file, None, origin, canonicalize)
file.write(self.signature)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
header = parser.get_struct('!HBBIIIH')
header = parser.get_struct("!HBBIIIH")
signer = parser.get_name(origin)
signature = parser.get_remaining()
return cls(rdclass, rdtype, *header, signer, signature)

View file

@ -30,11 +30,11 @@ class SOA(dns.rdata.Rdata):
# see: RFC 1035
__slots__ = ['mname', 'rname', 'serial', 'refresh', 'retry', 'expire',
'minimum']
__slots__ = ["mname", "rname", "serial", "refresh", "retry", "expire", "minimum"]
def __init__(self, rdclass, rdtype, mname, rname, serial, refresh, retry,
expire, minimum):
def __init__(
self, rdclass, rdtype, mname, rname, serial, refresh, retry, expire, minimum
):
super().__init__(rdclass, rdtype)
self.mname = self._as_name(mname)
self.rname = self._as_name(rname)
@ -47,13 +47,20 @@ class SOA(dns.rdata.Rdata):
def to_text(self, origin=None, relativize=True, **kw):
mname = self.mname.choose_relativity(origin, relativize)
rname = self.rname.choose_relativity(origin, relativize)
return '%s %s %d %d %d %d %d' % (
mname, rname, self.serial, self.refresh, self.retry,
self.expire, self.minimum)
return "%s %s %d %d %d %d %d" % (
mname,
rname,
self.serial,
self.refresh,
self.retry,
self.expire,
self.minimum,
)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
mname = tok.get_name(origin, relativize, relativize_to)
rname = tok.get_name(origin, relativize, relativize_to)
serial = tok.get_uint32()
@ -61,18 +68,20 @@ class SOA(dns.rdata.Rdata):
retry = tok.get_ttl()
expire = tok.get_ttl()
minimum = tok.get_ttl()
return cls(rdclass, rdtype, mname, rname, serial, refresh, retry,
expire, minimum)
return cls(
rdclass, rdtype, mname, rname, serial, refresh, retry, expire, minimum
)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
self.mname.to_wire(file, compress, origin, canonicalize)
self.rname.to_wire(file, compress, origin, canonicalize)
five_ints = struct.pack('!IIIII', self.serial, self.refresh,
self.retry, self.expire, self.minimum)
five_ints = struct.pack(
"!IIIII", self.serial, self.refresh, self.retry, self.expire, self.minimum
)
file.write(five_ints)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
mname = parser.get_name(origin)
rname = parser.get_name(origin)
return cls(rdclass, rdtype, mname, rname, *parser.get_struct('!IIIII'))
return cls(rdclass, rdtype, mname, rname, *parser.get_struct("!IIIII"))

View file

@ -30,10 +30,9 @@ class SSHFP(dns.rdata.Rdata):
# See RFC 4255
__slots__ = ['algorithm', 'fp_type', 'fingerprint']
__slots__ = ["algorithm", "fp_type", "fingerprint"]
def __init__(self, rdclass, rdtype, algorithm, fp_type,
fingerprint):
def __init__(self, rdclass, rdtype, algorithm, fp_type, fingerprint):
super().__init__(rdclass, rdtype)
self.algorithm = self._as_uint8(algorithm)
self.fp_type = self._as_uint8(fp_type)
@ -41,16 +40,17 @@ class SSHFP(dns.rdata.Rdata):
def to_text(self, origin=None, relativize=True, **kw):
kw = kw.copy()
chunksize = kw.pop('chunksize', 128)
return '%d %d %s' % (self.algorithm,
self.fp_type,
dns.rdata._hexify(self.fingerprint,
chunksize=chunksize,
**kw))
chunksize = kw.pop("chunksize", 128)
return "%d %d %s" % (
self.algorithm,
self.fp_type,
dns.rdata._hexify(self.fingerprint, chunksize=chunksize, **kw),
)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
algorithm = tok.get_uint8()
fp_type = tok.get_uint8()
fingerprint = tok.concatenate_remaining_identifiers().encode()

View file

@ -18,7 +18,6 @@
import base64
import struct
import dns.dnssec
import dns.immutable
import dns.exception
import dns.rdata
@ -29,11 +28,28 @@ class TKEY(dns.rdata.Rdata):
"""TKEY Record"""
__slots__ = ['algorithm', 'inception', 'expiration', 'mode', 'error',
'key', 'other']
__slots__ = [
"algorithm",
"inception",
"expiration",
"mode",
"error",
"key",
"other",
]
def __init__(self, rdclass, rdtype, algorithm, inception, expiration,
mode, error, key, other=b''):
def __init__(
self,
rdclass,
rdtype,
algorithm,
inception,
expiration,
mode,
error,
key,
other=b"",
):
super().__init__(rdclass, rdtype)
self.algorithm = self._as_name(algorithm)
self.inception = self._as_uint32(inception)
@ -45,17 +61,23 @@ class TKEY(dns.rdata.Rdata):
def to_text(self, origin=None, relativize=True, **kw):
_algorithm = self.algorithm.choose_relativity(origin, relativize)
text = '%s %u %u %u %u %s' % (str(_algorithm), self.inception,
self.expiration, self.mode, self.error,
dns.rdata._base64ify(self.key, 0))
text = "%s %u %u %u %u %s" % (
str(_algorithm),
self.inception,
self.expiration,
self.mode,
self.error,
dns.rdata._base64ify(self.key, 0),
)
if len(self.other) > 0:
text += ' %s' % (dns.rdata._base64ify(self.other, 0))
text += " %s" % (dns.rdata._base64ify(self.other, 0))
return text
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
algorithm = tok.get_name(relativize=False)
inception = tok.get_uint32()
expiration = tok.get_uint32()
@ -66,13 +88,15 @@ class TKEY(dns.rdata.Rdata):
other_b64 = tok.concatenate_remaining_identifiers(True).encode()
other = base64.b64decode(other_b64)
return cls(rdclass, rdtype, algorithm, inception, expiration, mode,
error, key, other)
return cls(
rdclass, rdtype, algorithm, inception, expiration, mode, error, key, other
)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
self.algorithm.to_wire(file, compress, origin)
file.write(struct.pack("!IIHH", self.inception, self.expiration,
self.mode, self.error))
file.write(
struct.pack("!IIHH", self.inception, self.expiration, self.mode, self.error)
)
file.write(struct.pack("!H", len(self.key)))
file.write(self.key)
file.write(struct.pack("!H", len(self.other)))
@ -86,8 +110,9 @@ class TKEY(dns.rdata.Rdata):
key = parser.get_counted_bytes(2)
other = parser.get_counted_bytes(2)
return cls(rdclass, rdtype, algorithm, inception, expiration, mode,
error, key, other)
return cls(
rdclass, rdtype, algorithm, inception, expiration, mode, error, key, other
)
# Constants for the mode field - from RFC 2930:
# 2.5 The Mode Field

View file

@ -29,11 +29,28 @@ class TSIG(dns.rdata.Rdata):
"""TSIG record"""
__slots__ = ['algorithm', 'time_signed', 'fudge', 'mac',
'original_id', 'error', 'other']
__slots__ = [
"algorithm",
"time_signed",
"fudge",
"mac",
"original_id",
"error",
"other",
]
def __init__(self, rdclass, rdtype, algorithm, time_signed, fudge, mac,
original_id, error, other):
def __init__(
self,
rdclass,
rdtype,
algorithm,
time_signed,
fudge,
mac,
original_id,
error,
other,
):
"""Initialize a TSIG rdata.
*rdclass*, an ``int`` is the rdataclass of the Rdata.
@ -67,45 +84,60 @@ class TSIG(dns.rdata.Rdata):
def to_text(self, origin=None, relativize=True, **kw):
algorithm = self.algorithm.choose_relativity(origin, relativize)
error = dns.rcode.to_text(self.error, True)
text = f"{algorithm} {self.time_signed} {self.fudge} " + \
f"{len(self.mac)} {dns.rdata._base64ify(self.mac, 0)} " + \
f"{self.original_id} {error} {len(self.other)}"
text = (
f"{algorithm} {self.time_signed} {self.fudge} "
+ f"{len(self.mac)} {dns.rdata._base64ify(self.mac, 0)} "
+ f"{self.original_id} {error} {len(self.other)}"
)
if self.other:
text += f" {dns.rdata._base64ify(self.other, 0)}"
return text
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
algorithm = tok.get_name(relativize=False)
time_signed = tok.get_uint48()
fudge = tok.get_uint16()
mac_len = tok.get_uint16()
mac = base64.b64decode(tok.get_string())
if len(mac) != mac_len:
raise SyntaxError('invalid MAC')
raise SyntaxError("invalid MAC")
original_id = tok.get_uint16()
error = dns.rcode.from_text(tok.get_string())
other_len = tok.get_uint16()
if other_len > 0:
other = base64.b64decode(tok.get_string())
if len(other) != other_len:
raise SyntaxError('invalid other data')
raise SyntaxError("invalid other data")
else:
other = b''
return cls(rdclass, rdtype, algorithm, time_signed, fudge, mac,
original_id, error, other)
other = b""
return cls(
rdclass,
rdtype,
algorithm,
time_signed,
fudge,
mac,
original_id,
error,
other,
)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
self.algorithm.to_wire(file, None, origin, False)
file.write(struct.pack('!HIHH',
(self.time_signed >> 32) & 0xffff,
self.time_signed & 0xffffffff,
self.fudge,
len(self.mac)))
file.write(
struct.pack(
"!HIHH",
(self.time_signed >> 32) & 0xFFFF,
self.time_signed & 0xFFFFFFFF,
self.fudge,
len(self.mac),
)
)
file.write(self.mac)
file.write(struct.pack('!HHH', self.original_id, self.error,
len(self.other)))
file.write(struct.pack("!HHH", self.original_id, self.error, len(self.other)))
file.write(self.other)
@classmethod
@ -114,7 +146,16 @@ class TSIG(dns.rdata.Rdata):
time_signed = parser.get_uint48()
fudge = parser.get_uint16()
mac = parser.get_counted_bytes(2)
(original_id, error) = parser.get_struct('!HH')
(original_id, error) = parser.get_struct("!HH")
other = parser.get_counted_bytes(2)
return cls(rdclass, rdtype, algorithm, time_signed, fudge, mac,
original_id, error, other)
return cls(
rdclass,
rdtype,
algorithm,
time_signed,
fudge,
mac,
original_id,
error,
other,
)

View file

@ -32,7 +32,7 @@ class URI(dns.rdata.Rdata):
# see RFC 7553
__slots__ = ['priority', 'weight', 'target']
__slots__ = ["priority", "weight", "target"]
def __init__(self, rdclass, rdtype, priority, weight, target):
super().__init__(rdclass, rdtype)
@ -43,12 +43,12 @@ class URI(dns.rdata.Rdata):
raise dns.exception.SyntaxError("URI target cannot be empty")
def to_text(self, origin=None, relativize=True, **kw):
return '%d %d "%s"' % (self.priority, self.weight,
self.target.decode())
return '%d %d "%s"' % (self.priority, self.weight, self.target.decode())
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
priority = tok.get_uint16()
weight = tok.get_uint16()
target = tok.get().unescape()
@ -63,10 +63,10 @@ class URI(dns.rdata.Rdata):
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
(priority, weight) = parser.get_struct('!HH')
(priority, weight) = parser.get_struct("!HH")
target = parser.get_remaining()
if len(target) == 0:
raise dns.exception.FormError('URI target may not be empty')
raise dns.exception.FormError("URI target may not be empty")
return cls(rdclass, rdtype, priority, weight, target)
def _processing_priority(self):

View file

@ -30,7 +30,7 @@ class X25(dns.rdata.Rdata):
# see RFC 1183
__slots__ = ['address']
__slots__ = ["address"]
def __init__(self, rdclass, rdtype, address):
super().__init__(rdclass, rdtype)
@ -40,15 +40,16 @@ class X25(dns.rdata.Rdata):
return '"%s"' % dns.rdata._escapify(self.address)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
address = tok.get_string()
return cls(rdclass, rdtype, address)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
l = len(self.address)
assert l < 256
file.write(struct.pack('!B', l))
file.write(struct.pack("!B", l))
file.write(self.address)
@classmethod

View file

@ -6,7 +6,7 @@ import binascii
import dns.immutable
import dns.rdata
import dns.rdatatype
import dns.zone
import dns.zonetypes
@dns.immutable.immutable
@ -16,35 +16,38 @@ class ZONEMD(dns.rdata.Rdata):
# See RFC 8976
__slots__ = ['serial', 'scheme', 'hash_algorithm', 'digest']
__slots__ = ["serial", "scheme", "hash_algorithm", "digest"]
def __init__(self, rdclass, rdtype, serial, scheme, hash_algorithm, digest):
super().__init__(rdclass, rdtype)
self.serial = self._as_uint32(serial)
self.scheme = dns.zone.DigestScheme.make(scheme)
self.hash_algorithm = dns.zone.DigestHashAlgorithm.make(hash_algorithm)
self.scheme = dns.zonetypes.DigestScheme.make(scheme)
self.hash_algorithm = dns.zonetypes.DigestHashAlgorithm.make(hash_algorithm)
self.digest = self._as_bytes(digest)
if self.scheme == 0: # reserved, RFC 8976 Sec. 5.2
raise ValueError('scheme 0 is reserved')
raise ValueError("scheme 0 is reserved")
if self.hash_algorithm == 0: # reserved, RFC 8976 Sec. 5.3
raise ValueError('hash_algorithm 0 is reserved')
raise ValueError("hash_algorithm 0 is reserved")
hasher = dns.zone._digest_hashers.get(self.hash_algorithm)
hasher = dns.zonetypes._digest_hashers.get(self.hash_algorithm)
if hasher and hasher().digest_size != len(self.digest):
raise ValueError('digest length inconsistent with hash algorithm')
raise ValueError("digest length inconsistent with hash algorithm")
def to_text(self, origin=None, relativize=True, **kw):
kw = kw.copy()
chunksize = kw.pop('chunksize', 128)
return '%d %d %d %s' % (self.serial, self.scheme, self.hash_algorithm,
dns.rdata._hexify(self.digest,
chunksize=chunksize,
**kw))
chunksize = kw.pop("chunksize", 128)
return "%d %d %d %s" % (
self.serial,
self.scheme,
self.hash_algorithm,
dns.rdata._hexify(self.digest, chunksize=chunksize, **kw),
)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
serial = tok.get_uint32()
scheme = tok.get_uint8()
hash_algorithm = tok.get_uint8()
@ -53,8 +56,7 @@ class ZONEMD(dns.rdata.Rdata):
return cls(rdclass, rdtype, serial, scheme, hash_algorithm, digest)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
header = struct.pack("!IBB", self.serial, self.scheme,
self.hash_algorithm)
header = struct.pack("!IBB", self.serial, self.scheme, self.hash_algorithm)
file.write(header)
file.write(self.digest)

View file

@ -18,51 +18,51 @@
"""Class ANY (generic) rdata type classes."""
__all__ = [
'AFSDB',
'AMTRELAY',
'AVC',
'CAA',
'CDNSKEY',
'CDS',
'CERT',
'CNAME',
'CSYNC',
'DLV',
'DNAME',
'DNSKEY',
'DS',
'EUI48',
'EUI64',
'GPOS',
'HINFO',
'HIP',
'ISDN',
'L32',
'L64',
'LOC',
'LP',
'MX',
'NID',
'NINFO',
'NS',
'NSEC',
'NSEC3',
'NSEC3PARAM',
'OPENPGPKEY',
'OPT',
'PTR',
'RP',
'RRSIG',
'RT',
'SMIMEA',
'SOA',
'SPF',
'SSHFP',
'TKEY',
'TLSA',
'TSIG',
'TXT',
'URI',
'X25',
'ZONEMD',
"AFSDB",
"AMTRELAY",
"AVC",
"CAA",
"CDNSKEY",
"CDS",
"CERT",
"CNAME",
"CSYNC",
"DLV",
"DNAME",
"DNSKEY",
"DS",
"EUI48",
"EUI64",
"GPOS",
"HINFO",
"HIP",
"ISDN",
"L32",
"L64",
"LOC",
"LP",
"MX",
"NID",
"NINFO",
"NS",
"NSEC",
"NSEC3",
"NSEC3PARAM",
"OPENPGPKEY",
"OPT",
"PTR",
"RP",
"RRSIG",
"RT",
"SMIMEA",
"SOA",
"SPF",
"SSHFP",
"TKEY",
"TLSA",
"TSIG",
"TXT",
"URI",
"X25",
"ZONEMD",
]

View file

@ -20,6 +20,7 @@ import struct
import dns.rdtypes.mxbase
import dns.immutable
@dns.immutable.immutable
class A(dns.rdata.Rdata):
@ -28,7 +29,7 @@ class A(dns.rdata.Rdata):
# domain: the domain of the address
# address: the 16-bit address
__slots__ = ['domain', 'address']
__slots__ = ["domain", "address"]
def __init__(self, rdclass, rdtype, domain, address):
super().__init__(rdclass, rdtype)
@ -37,11 +38,12 @@ class A(dns.rdata.Rdata):
def to_text(self, origin=None, relativize=True, **kw):
domain = self.domain.choose_relativity(origin, relativize)
return '%s %o' % (domain, self.address)
return "%s %o" % (domain, self.address)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
domain = tok.get_name(origin, relativize, relativize_to)
address = tok.get_uint16(base=8)
return cls(rdclass, rdtype, domain, address)

View file

@ -18,5 +18,5 @@
"""Class CH rdata type classes."""
__all__ = [
'A',
"A",
]

View file

@ -27,7 +27,7 @@ class A(dns.rdata.Rdata):
"""A record."""
__slots__ = ['address']
__slots__ = ["address"]
def __init__(self, rdclass, rdtype, address):
super().__init__(rdclass, rdtype)
@ -37,8 +37,9 @@ class A(dns.rdata.Rdata):
return self.address
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
address = tok.get_identifier()
return cls(rdclass, rdtype, address)

View file

@ -27,7 +27,7 @@ class AAAA(dns.rdata.Rdata):
"""AAAA record."""
__slots__ = ['address']
__slots__ = ["address"]
def __init__(self, rdclass, rdtype, address):
super().__init__(rdclass, rdtype)
@ -37,8 +37,9 @@ class AAAA(dns.rdata.Rdata):
return self.address
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
address = tok.get_identifier()
return cls(rdclass, rdtype, address)

View file

@ -26,12 +26,13 @@ import dns.ipv6
import dns.rdata
import dns.tokenizer
@dns.immutable.immutable
class APLItem:
"""An APL list item."""
__slots__ = ['family', 'negation', 'address', 'prefix']
__slots__ = ["family", "negation", "address", "prefix"]
def __init__(self, family, negation, address, prefix):
self.family = dns.rdata.Rdata._as_uint16(family)
@ -67,12 +68,12 @@ class APLItem:
if address[i] != 0:
last = i + 1
break
address = address[0: last]
address = address[0:last]
l = len(address)
assert l < 128
if self.negation:
l |= 0x80
header = struct.pack('!HBB', self.family, self.prefix, l)
header = struct.pack("!HBB", self.family, self.prefix, l)
file.write(header)
file.write(address)
@ -84,32 +85,33 @@ class APL(dns.rdata.Rdata):
# see: RFC 3123
__slots__ = ['items']
__slots__ = ["items"]
def __init__(self, rdclass, rdtype, items):
super().__init__(rdclass, rdtype)
for item in items:
if not isinstance(item, APLItem):
raise ValueError('item not an APLItem')
raise ValueError("item not an APLItem")
self.items = tuple(items)
def to_text(self, origin=None, relativize=True, **kw):
return ' '.join(map(str, self.items))
return " ".join(map(str, self.items))
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
items = []
for token in tok.get_remaining():
item = token.unescape().value
if item[0] == '!':
if item[0] == "!":
negation = True
item = item[1:]
else:
negation = False
(family, rest) = item.split(':', 1)
(family, rest) = item.split(":", 1)
family = int(family)
(address, prefix) = rest.split('/', 1)
(address, prefix) = rest.split("/", 1)
prefix = int(prefix)
item = APLItem(family, negation, address, prefix)
items.append(item)
@ -125,7 +127,7 @@ class APL(dns.rdata.Rdata):
items = []
while parser.remaining() > 0:
header = parser.get_struct('!HBB')
header = parser.get_struct("!HBB")
afdlen = header[2]
if afdlen > 127:
negation = True
@ -136,16 +138,16 @@ class APL(dns.rdata.Rdata):
l = len(address)
if header[0] == 1:
if l < 4:
address += b'\x00' * (4 - l)
address += b"\x00" * (4 - l)
elif header[0] == 2:
if l < 16:
address += b'\x00' * (16 - l)
address += b"\x00" * (16 - l)
else:
#
# This isn't really right according to the RFC, but it
# seems better than throwing an exception
#
address = codecs.encode(address, 'hex_codec')
address = codecs.encode(address, "hex_codec")
item = APLItem(header[0], negation, address, header[1])
items.append(item)
return cls(rdclass, rdtype, items)

View file

@ -19,6 +19,7 @@ import base64
import dns.exception
import dns.immutable
import dns.rdata
@dns.immutable.immutable
@ -28,7 +29,7 @@ class DHCID(dns.rdata.Rdata):
# see: RFC 4701
__slots__ = ['data']
__slots__ = ["data"]
def __init__(self, rdclass, rdtype, data):
super().__init__(rdclass, rdtype)
@ -38,8 +39,9 @@ class DHCID(dns.rdata.Rdata):
return dns.rdata._base64ify(self.data, **kw)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
b64 = tok.concatenate_remaining_identifiers().encode()
data = base64.b64decode(b64)
return cls(rdclass, rdtype, data)

View file

@ -3,6 +3,7 @@
import dns.rdtypes.svcbbase
import dns.immutable
@dns.immutable.immutable
class HTTPS(dns.rdtypes.svcbbase.SVCBBase):
"""HTTPS record"""

View file

@ -24,7 +24,8 @@ import dns.rdtypes.util
class Gateway(dns.rdtypes.util.Gateway):
name = 'IPSECKEY gateway'
name = "IPSECKEY gateway"
@dns.immutable.immutable
class IPSECKEY(dns.rdata.Rdata):
@ -33,10 +34,11 @@ class IPSECKEY(dns.rdata.Rdata):
# see: RFC 4025
__slots__ = ['precedence', 'gateway_type', 'algorithm', 'gateway', 'key']
__slots__ = ["precedence", "gateway_type", "algorithm", "gateway", "key"]
def __init__(self, rdclass, rdtype, precedence, gateway_type, algorithm,
gateway, key):
def __init__(
self, rdclass, rdtype, precedence, gateway_type, algorithm, gateway, key
):
super().__init__(rdclass, rdtype)
gateway = Gateway(gateway_type, gateway)
self.precedence = self._as_uint8(precedence)
@ -46,38 +48,45 @@ class IPSECKEY(dns.rdata.Rdata):
self.key = self._as_bytes(key)
def to_text(self, origin=None, relativize=True, **kw):
gateway = Gateway(self.gateway_type, self.gateway).to_text(origin,
relativize)
return '%d %d %d %s %s' % (self.precedence, self.gateway_type,
self.algorithm, gateway,
dns.rdata._base64ify(self.key, **kw))
gateway = Gateway(self.gateway_type, self.gateway).to_text(origin, relativize)
return "%d %d %d %s %s" % (
self.precedence,
self.gateway_type,
self.algorithm,
gateway,
dns.rdata._base64ify(self.key, **kw),
)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
precedence = tok.get_uint8()
gateway_type = tok.get_uint8()
algorithm = tok.get_uint8()
gateway = Gateway.from_text(gateway_type, tok, origin, relativize,
relativize_to)
gateway = Gateway.from_text(
gateway_type, tok, origin, relativize, relativize_to
)
b64 = tok.concatenate_remaining_identifiers().encode()
key = base64.b64decode(b64)
return cls(rdclass, rdtype, precedence, gateway_type, algorithm,
gateway.gateway, key)
return cls(
rdclass, rdtype, precedence, gateway_type, algorithm, gateway.gateway, key
)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
header = struct.pack("!BBB", self.precedence, self.gateway_type,
self.algorithm)
header = struct.pack("!BBB", self.precedence, self.gateway_type, self.algorithm)
file.write(header)
Gateway(self.gateway_type, self.gateway).to_wire(file, compress,
origin, canonicalize)
Gateway(self.gateway_type, self.gateway).to_wire(
file, compress, origin, canonicalize
)
file.write(self.key)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
header = parser.get_struct('!BBB')
header = parser.get_struct("!BBB")
gateway_type = header[1]
gateway = Gateway.from_wire_parser(gateway_type, parser, origin)
key = parser.get_remaining()
return cls(rdclass, rdtype, header[0], gateway_type, header[2],
gateway.gateway, key)
return cls(
rdclass, rdtype, header[0], gateway_type, header[2], gateway.gateway, key
)

View file

@ -27,7 +27,7 @@ import dns.rdtypes.util
def _write_string(file, s):
l = len(s)
assert l < 256
file.write(struct.pack('!B', l))
file.write(struct.pack("!B", l))
file.write(s)
@ -38,11 +38,11 @@ class NAPTR(dns.rdata.Rdata):
# see: RFC 3403
__slots__ = ['order', 'preference', 'flags', 'service', 'regexp',
'replacement']
__slots__ = ["order", "preference", "flags", "service", "regexp", "replacement"]
def __init__(self, rdclass, rdtype, order, preference, flags, service,
regexp, replacement):
def __init__(
self, rdclass, rdtype, order, preference, flags, service, regexp, replacement
):
super().__init__(rdclass, rdtype)
self.flags = self._as_bytes(flags, True, 255)
self.service = self._as_bytes(service, True, 255)
@ -53,24 +53,28 @@ class NAPTR(dns.rdata.Rdata):
def to_text(self, origin=None, relativize=True, **kw):
replacement = self.replacement.choose_relativity(origin, relativize)
return '%d %d "%s" "%s" "%s" %s' % \
(self.order, self.preference,
dns.rdata._escapify(self.flags),
dns.rdata._escapify(self.service),
dns.rdata._escapify(self.regexp),
replacement)
return '%d %d "%s" "%s" "%s" %s' % (
self.order,
self.preference,
dns.rdata._escapify(self.flags),
dns.rdata._escapify(self.service),
dns.rdata._escapify(self.regexp),
replacement,
)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
order = tok.get_uint16()
preference = tok.get_uint16()
flags = tok.get_string()
service = tok.get_string()
regexp = tok.get_string()
replacement = tok.get_name(origin, relativize, relativize_to)
return cls(rdclass, rdtype, order, preference, flags, service,
regexp, replacement)
return cls(
rdclass, rdtype, order, preference, flags, service, regexp, replacement
)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
two_ints = struct.pack("!HH", self.order, self.preference)
@ -82,14 +86,22 @@ class NAPTR(dns.rdata.Rdata):
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
(order, preference) = parser.get_struct('!HH')
(order, preference) = parser.get_struct("!HH")
strings = []
for _ in range(3):
s = parser.get_counted_bytes()
strings.append(s)
replacement = parser.get_name(origin)
return cls(rdclass, rdtype, order, preference, strings[0], strings[1],
strings[2], replacement)
return cls(
rdclass,
rdtype,
order,
preference,
strings[0],
strings[1],
strings[2],
replacement,
)
def _processing_priority(self):
return (self.order, self.preference)

View file

@ -30,7 +30,7 @@ class NSAP(dns.rdata.Rdata):
# see: RFC 1706
__slots__ = ['address']
__slots__ = ["address"]
def __init__(self, rdclass, rdtype, address):
super().__init__(rdclass, rdtype)
@ -40,14 +40,15 @@ class NSAP(dns.rdata.Rdata):
return "0x%s" % binascii.hexlify(self.address).decode()
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
address = tok.get_string()
if address[0:2] != '0x':
raise dns.exception.SyntaxError('string does not start with 0x')
address = address[2:].replace('.', '')
if address[0:2] != "0x":
raise dns.exception.SyntaxError("string does not start with 0x")
address = address[2:].replace(".", "")
if len(address) % 2 != 0:
raise dns.exception.SyntaxError('hexstring has odd length')
raise dns.exception.SyntaxError("hexstring has odd length")
address = binascii.unhexlify(address.encode())
return cls(rdclass, rdtype, address)

View file

@ -31,7 +31,7 @@ class PX(dns.rdata.Rdata):
# see: RFC 2163
__slots__ = ['preference', 'map822', 'mapx400']
__slots__ = ["preference", "map822", "mapx400"]
def __init__(self, rdclass, rdtype, preference, map822, mapx400):
super().__init__(rdclass, rdtype)
@ -42,11 +42,12 @@ class PX(dns.rdata.Rdata):
def to_text(self, origin=None, relativize=True, **kw):
map822 = self.map822.choose_relativity(origin, relativize)
mapx400 = self.mapx400.choose_relativity(origin, relativize)
return '%d %s %s' % (self.preference, map822, mapx400)
return "%d %s %s" % (self.preference, map822, mapx400)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
preference = tok.get_uint16()
map822 = tok.get_name(origin, relativize, relativize_to)
mapx400 = tok.get_name(origin, relativize, relativize_to)

View file

@ -31,7 +31,7 @@ class SRV(dns.rdata.Rdata):
# see: RFC 2782
__slots__ = ['priority', 'weight', 'port', 'target']
__slots__ = ["priority", "weight", "port", "target"]
def __init__(self, rdclass, rdtype, priority, weight, port, target):
super().__init__(rdclass, rdtype)
@ -42,12 +42,12 @@ class SRV(dns.rdata.Rdata):
def to_text(self, origin=None, relativize=True, **kw):
target = self.target.choose_relativity(origin, relativize)
return '%d %d %d %s' % (self.priority, self.weight, self.port,
target)
return "%d %d %d %s" % (self.priority, self.weight, self.port, target)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
priority = tok.get_uint16()
weight = tok.get_uint16()
port = tok.get_uint16()
@ -61,7 +61,7 @@ class SRV(dns.rdata.Rdata):
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
(priority, weight, port) = parser.get_struct('!HHH')
(priority, weight, port) = parser.get_struct("!HHH")
target = parser.get_name(origin)
return cls(rdclass, rdtype, priority, weight, port, target)

View file

@ -3,6 +3,7 @@
import dns.rdtypes.svcbbase
import dns.immutable
@dns.immutable.immutable
class SVCB(dns.rdtypes.svcbbase.SVCBBase):
"""SVCB record"""

View file

@ -23,13 +23,14 @@ import dns.immutable
import dns.rdata
try:
_proto_tcp = socket.getprotobyname('tcp')
_proto_udp = socket.getprotobyname('udp')
_proto_tcp = socket.getprotobyname("tcp")
_proto_udp = socket.getprotobyname("udp")
except OSError:
# Fall back to defaults in case /etc/protocols is unavailable.
_proto_tcp = 6
_proto_udp = 17
@dns.immutable.immutable
class WKS(dns.rdata.Rdata):
@ -37,7 +38,7 @@ class WKS(dns.rdata.Rdata):
# see: RFC 1035
__slots__ = ['address', 'protocol', 'bitmap']
__slots__ = ["address", "protocol", "bitmap"]
def __init__(self, rdclass, rdtype, address, protocol, bitmap):
super().__init__(rdclass, rdtype)
@ -51,12 +52,13 @@ class WKS(dns.rdata.Rdata):
for j in range(0, 8):
if byte & (0x80 >> j):
bits.append(str(i * 8 + j))
text = ' '.join(bits)
return '%s %d %s' % (self.address, self.protocol, text)
text = " ".join(bits)
return "%s %d %s" % (self.address, self.protocol, text)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
address = tok.get_string()
protocol = tok.get_string()
if protocol.isdigit():
@ -87,7 +89,7 @@ class WKS(dns.rdata.Rdata):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
file.write(dns.ipv4.inet_aton(self.address))
protocol = struct.pack('!B', self.protocol)
protocol = struct.pack("!B", self.protocol)
file.write(protocol)
file.write(self.bitmap)

View file

@ -18,18 +18,18 @@
"""Class IN rdata type classes."""
__all__ = [
'A',
'AAAA',
'APL',
'DHCID',
'HTTPS',
'IPSECKEY',
'KX',
'NAPTR',
'NSAP',
'NSAP_PTR',
'PX',
'SRV',
'SVCB',
'WKS',
"A",
"AAAA",
"APL",
"DHCID",
"HTTPS",
"IPSECKEY",
"KX",
"NAPTR",
"NSAP",
"NSAP_PTR",
"PX",
"SRV",
"SVCB",
"WKS",
]

View file

@ -18,16 +18,16 @@
"""DNS rdata type classes"""
__all__ = [
'ANY',
'IN',
'CH',
'dnskeybase',
'dsbase',
'euibase',
'mxbase',
'nsbase',
'svcbbase',
'tlsabase',
'txtbase',
'util'
"ANY",
"IN",
"CH",
"dnskeybase",
"dsbase",
"euibase",
"mxbase",
"nsbase",
"svcbbase",
"tlsabase",
"txtbase",
"util",
]

View file

@ -21,11 +21,12 @@ import struct
import dns.exception
import dns.immutable
import dns.dnssec
import dns.dnssectypes
import dns.rdata
# wildcard import
__all__ = ["SEP", "REVOKE", "ZONE"] # noqa: F822
__all__ = ["SEP", "REVOKE", "ZONE"] # noqa: F822
class Flag(enum.IntFlag):
SEP = 0x0001
@ -38,22 +39,27 @@ class DNSKEYBase(dns.rdata.Rdata):
"""Base class for rdata that is like a DNSKEY record"""
__slots__ = ['flags', 'protocol', 'algorithm', 'key']
__slots__ = ["flags", "protocol", "algorithm", "key"]
def __init__(self, rdclass, rdtype, flags, protocol, algorithm, key):
super().__init__(rdclass, rdtype)
self.flags = self._as_uint16(flags)
self.protocol = self._as_uint8(protocol)
self.algorithm = dns.dnssec.Algorithm.make(algorithm)
self.algorithm = dns.dnssectypes.Algorithm.make(algorithm)
self.key = self._as_bytes(key)
def to_text(self, origin=None, relativize=True, **kw):
return '%d %d %d %s' % (self.flags, self.protocol, self.algorithm,
dns.rdata._base64ify(self.key, **kw))
return "%d %d %d %s" % (
self.flags,
self.protocol,
self.algorithm,
dns.rdata._base64ify(self.key, **kw),
)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
flags = tok.get_uint16()
protocol = tok.get_uint8()
algorithm = tok.get_string()
@ -68,10 +74,10 @@ class DNSKEYBase(dns.rdata.Rdata):
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
header = parser.get_struct('!HBB')
header = parser.get_struct("!HBB")
key = parser.get_remaining()
return cls(rdclass, rdtype, header[0], header[1], header[2],
key)
return cls(rdclass, rdtype, header[0], header[1], header[2], key)
### BEGIN generated Flag constants

View file

@ -1,38 +0,0 @@
from typing import Set, Any
SEP : int
REVOKE : int
ZONE : int
def flags_to_text_set(flags : int) -> Set[str]:
...
def flags_from_text_set(texts_set) -> int:
...
from .. import rdata
class DNSKEYBase(rdata.Rdata):
def __init__(self, rdclass, rdtype, flags, protocol, algorithm, key):
self.flags : int
self.protocol : int
self.key : str
self.algorithm : int
def to_text(self, origin : Any = None, relativize=True, **kw : Any):
...
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
...
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
...
@classmethod
def from_parser(cls, rdclass, rdtype, parser, origin=None):
...
def flags_to_text_set(self) -> Set[str]:
...

View file

@ -18,7 +18,7 @@
import struct
import binascii
import dns.dnssec
import dns.dnssectypes
import dns.immutable
import dns.rdata
import dns.rdatatype
@ -29,9 +29,10 @@ class DSBase(dns.rdata.Rdata):
"""Base class for rdata that is like a DS record"""
__slots__ = ['key_tag', 'algorithm', 'digest_type', 'digest']
__slots__ = ["key_tag", "algorithm", "digest_type", "digest"]
# Digest types registry: https://www.iana.org/assignments/ds-rr-types/ds-rr-types.xhtml
# Digest types registry:
# https://www.iana.org/assignments/ds-rr-types/ds-rr-types.xhtml
_digest_length_by_type = {
1: 20, # SHA-1, RFC 3658 Sec. 2.4
2: 32, # SHA-256, RFC 4509 Sec. 2.2
@ -39,43 +40,42 @@ class DSBase(dns.rdata.Rdata):
4: 48, # SHA-384, RFC 6605 Sec. 2
}
def __init__(self, rdclass, rdtype, key_tag, algorithm, digest_type,
digest):
def __init__(self, rdclass, rdtype, key_tag, algorithm, digest_type, digest):
super().__init__(rdclass, rdtype)
self.key_tag = self._as_uint16(key_tag)
self.algorithm = dns.dnssec.Algorithm.make(algorithm)
self.algorithm = dns.dnssectypes.Algorithm.make(algorithm)
self.digest_type = self._as_uint8(digest_type)
self.digest = self._as_bytes(digest)
try:
if len(self.digest) != self._digest_length_by_type[self.digest_type]:
raise ValueError('digest length inconsistent with digest type')
raise ValueError("digest length inconsistent with digest type")
except KeyError:
if self.digest_type == 0: # reserved, RFC 3658 Sec. 2.4
raise ValueError('digest type 0 is reserved')
raise ValueError("digest type 0 is reserved")
def to_text(self, origin=None, relativize=True, **kw):
kw = kw.copy()
chunksize = kw.pop('chunksize', 128)
return '%d %d %d %s' % (self.key_tag, self.algorithm,
self.digest_type,
dns.rdata._hexify(self.digest,
chunksize=chunksize,
**kw))
chunksize = kw.pop("chunksize", 128)
return "%d %d %d %s" % (
self.key_tag,
self.algorithm,
self.digest_type,
dns.rdata._hexify(self.digest, chunksize=chunksize, **kw),
)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
key_tag = tok.get_uint16()
algorithm = tok.get_string()
digest_type = tok.get_uint8()
digest = tok.concatenate_remaining_identifiers().encode()
digest = binascii.unhexlify(digest)
return cls(rdclass, rdtype, key_tag, algorithm, digest_type,
digest)
return cls(rdclass, rdtype, key_tag, algorithm, digest_type, digest)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
header = struct.pack("!HBB", self.key_tag, self.algorithm,
self.digest_type)
header = struct.pack("!HBB", self.key_tag, self.algorithm, self.digest_type)
file.write(header)
file.write(self.digest)

View file

@ -27,7 +27,7 @@ class EUIBase(dns.rdata.Rdata):
# see: rfc7043.txt
__slots__ = ['eui']
__slots__ = ["eui"]
# define these in subclasses
# byte_len = 6 # 0123456789ab (in hex)
# text_len = byte_len * 3 - 1 # 01-23-45-67-89-ab
@ -36,28 +36,30 @@ class EUIBase(dns.rdata.Rdata):
super().__init__(rdclass, rdtype)
self.eui = self._as_bytes(eui)
if len(self.eui) != self.byte_len:
raise dns.exception.FormError('EUI%s rdata has to have %s bytes'
% (self.byte_len * 8, self.byte_len))
raise dns.exception.FormError(
"EUI%s rdata has to have %s bytes" % (self.byte_len * 8, self.byte_len)
)
def to_text(self, origin=None, relativize=True, **kw):
return dns.rdata._hexify(self.eui, chunksize=2, separator=b'-', **kw)
return dns.rdata._hexify(self.eui, chunksize=2, separator=b"-", **kw)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
text = tok.get_string()
if len(text) != cls.text_len:
raise dns.exception.SyntaxError(
'Input text must have %s characters' % cls.text_len)
"Input text must have %s characters" % cls.text_len
)
for i in range(2, cls.byte_len * 3 - 1, 3):
if text[i] != '-':
raise dns.exception.SyntaxError('Dash expected at position %s'
% i)
text = text.replace('-', '')
if text[i] != "-":
raise dns.exception.SyntaxError("Dash expected at position %s" % i)
text = text.replace("-", "")
try:
data = binascii.unhexlify(text.encode())
except (ValueError, TypeError) as ex:
raise dns.exception.SyntaxError('Hex decoding error: %s' % str(ex))
raise dns.exception.SyntaxError("Hex decoding error: %s" % str(ex))
return cls(rdclass, rdtype, data)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):

View file

@ -31,7 +31,7 @@ class MXBase(dns.rdata.Rdata):
"""Base class for rdata that is like an MX record."""
__slots__ = ['preference', 'exchange']
__slots__ = ["preference", "exchange"]
def __init__(self, rdclass, rdtype, preference, exchange):
super().__init__(rdclass, rdtype)
@ -40,11 +40,12 @@ class MXBase(dns.rdata.Rdata):
def to_text(self, origin=None, relativize=True, **kw):
exchange = self.exchange.choose_relativity(origin, relativize)
return '%d %s' % (self.preference, exchange)
return "%d %s" % (self.preference, exchange)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
preference = tok.get_uint16()
exchange = tok.get_name(origin, relativize, relativize_to)
return cls(rdclass, rdtype, preference, exchange)

View file

@ -28,7 +28,7 @@ class NSBase(dns.rdata.Rdata):
"""Base class for rdata that is like an NS record."""
__slots__ = ['target']
__slots__ = ["target"]
def __init__(self, rdclass, rdtype, target):
super().__init__(rdclass, rdtype)
@ -39,8 +39,9 @@ class NSBase(dns.rdata.Rdata):
return str(target)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
target = tok.get_name(origin, relativize, relativize_to)
return cls(rdclass, rdtype, target)

View file

@ -63,44 +63,48 @@ def _validate_key(key):
if isinstance(key, bytes):
# We decode to latin-1 so we get 0-255 as valid and do NOT interpret
# UTF-8 sequences
key = key.decode('latin-1')
key = key.decode("latin-1")
if isinstance(key, str):
if key.lower().startswith('key'):
if key.lower().startswith("key"):
force_generic = True
if key[3:].startswith('0') and len(key) != 4:
if key[3:].startswith("0") and len(key) != 4:
# key has leading zeros
raise ValueError('leading zeros in key')
key = key.replace('-', '_')
raise ValueError("leading zeros in key")
key = key.replace("-", "_")
return (ParamKey.make(key), force_generic)
def key_to_text(key):
return ParamKey.to_text(key).replace('_', '-').lower()
return ParamKey.to_text(key).replace("_", "-").lower()
# Like rdata escapify, but escapes ',' too.
_escaped = b'",\\'
def _escapify(qstring):
text = ''
text = ""
for c in qstring:
if c in _escaped:
text += '\\' + chr(c)
text += "\\" + chr(c)
elif c >= 0x20 and c < 0x7F:
text += chr(c)
else:
text += '\\%03d' % c
text += "\\%03d" % c
return text
def _unescape(value):
if value == '':
if value == "":
return value
unescaped = b''
unescaped = b""
l = len(value)
i = 0
while i < l:
c = value[i]
i += 1
if c == '\\':
if c == "\\":
if i >= l: # pragma: no cover (can't happen via tokenizer get())
raise dns.exception.UnexpectedEnd
c = value[i]
@ -119,7 +123,7 @@ def _unescape(value):
codepoint = int(c) * 100 + int(c2) * 10 + int(c3)
if codepoint > 255:
raise dns.exception.SyntaxError
unescaped += b'%c' % (codepoint)
unescaped += b"%c" % (codepoint)
continue
unescaped += c.encode()
return unescaped
@ -129,21 +133,21 @@ def _split(value):
l = len(value)
i = 0
items = []
unescaped = b''
unescaped = b""
while i < l:
c = value[i]
i += 1
if c == ord('\\'):
if c == ord("\\"):
if i >= l: # pragma: no cover (can't happen via tokenizer get())
raise dns.exception.UnexpectedEnd
c = value[i]
i += 1
unescaped += b'%c' % (c)
elif c == ord(','):
unescaped += b"%c" % (c)
elif c == ord(","):
items.append(unescaped)
unescaped = b''
unescaped = b""
else:
unescaped += b'%c' % (c)
unescaped += b"%c" % (c)
items.append(unescaped)
return items
@ -159,8 +163,8 @@ class Param:
@dns.immutable.immutable
class GenericParam(Param):
"""Generic SVCB parameter
"""
"""Generic SVCB parameter"""
def __init__(self, value):
self.value = dns.rdata.Rdata._as_bytes(value, True)
@ -198,19 +202,19 @@ class MandatoryParam(Param):
prior_k = None
for k in keys:
if k == prior_k:
raise ValueError(f'duplicate key {k:d}')
raise ValueError(f"duplicate key {k:d}")
prior_k = k
if k == ParamKey.MANDATORY:
raise ValueError('listed the mandatory key as mandatory')
raise ValueError("listed the mandatory key as mandatory")
self.keys = tuple(keys)
@classmethod
def from_value(cls, value):
keys = [k.encode() for k in value.split(',')]
keys = [k.encode() for k in value.split(",")]
return cls(keys)
def to_text(self):
return '"' + ','.join([key_to_text(key) for key in self.keys]) + '"'
return '"' + ",".join([key_to_text(key) for key in self.keys]) + '"'
@classmethod
def from_wire_parser(cls, parser, origin=None): # pylint: disable=W0613
@ -219,28 +223,29 @@ class MandatoryParam(Param):
while parser.remaining() > 0:
key = parser.get_uint16()
if key < last_key:
raise dns.exception.FormError('manadatory keys not ascending')
raise dns.exception.FormError("manadatory keys not ascending")
last_key = key
keys.append(key)
return cls(keys)
def to_wire(self, file, origin=None): # pylint: disable=W0613
for key in self.keys:
file.write(struct.pack('!H', key))
file.write(struct.pack("!H", key))
@dns.immutable.immutable
class ALPNParam(Param):
def __init__(self, ids):
self.ids = dns.rdata.Rdata._as_tuple(
ids, lambda x: dns.rdata.Rdata._as_bytes(x, True, 255, False))
ids, lambda x: dns.rdata.Rdata._as_bytes(x, True, 255, False)
)
@classmethod
def from_value(cls, value):
return cls(_split(_unescape(value)))
def to_text(self):
value = ','.join([_escapify(id) for id in self.ids])
value = ",".join([_escapify(id) for id in self.ids])
return '"' + dns.rdata._escapify(value.encode()) + '"'
@classmethod
@ -253,7 +258,7 @@ class ALPNParam(Param):
def to_wire(self, file, origin=None): # pylint: disable=W0613
for id in self.ids:
file.write(struct.pack('!B', len(id)))
file.write(struct.pack("!B", len(id)))
file.write(id)
@ -269,10 +274,10 @@ class NoDefaultALPNParam(Param):
@classmethod
def from_value(cls, value):
if value is None or value == '':
if value is None or value == "":
return None
else:
raise ValueError('no-default-alpn with non-empty value')
raise ValueError("no-default-alpn with non-empty value")
def to_text(self):
raise NotImplementedError # pragma: no cover
@ -306,22 +311,23 @@ class PortParam(Param):
return cls(port)
def to_wire(self, file, origin=None): # pylint: disable=W0613
file.write(struct.pack('!H', self.port))
file.write(struct.pack("!H", self.port))
@dns.immutable.immutable
class IPv4HintParam(Param):
def __init__(self, addresses):
self.addresses = dns.rdata.Rdata._as_tuple(
addresses, dns.rdata.Rdata._as_ipv4_address)
addresses, dns.rdata.Rdata._as_ipv4_address
)
@classmethod
def from_value(cls, value):
addresses = value.split(',')
addresses = value.split(",")
return cls(addresses)
def to_text(self):
return '"' + ','.join(self.addresses) + '"'
return '"' + ",".join(self.addresses) + '"'
@classmethod
def from_wire_parser(cls, parser, origin=None): # pylint: disable=W0613
@ -340,15 +346,16 @@ class IPv4HintParam(Param):
class IPv6HintParam(Param):
def __init__(self, addresses):
self.addresses = dns.rdata.Rdata._as_tuple(
addresses, dns.rdata.Rdata._as_ipv6_address)
addresses, dns.rdata.Rdata._as_ipv6_address
)
@classmethod
def from_value(cls, value):
addresses = value.split(',')
addresses = value.split(",")
return cls(addresses)
def to_text(self):
return '"' + ','.join(self.addresses) + '"'
return '"' + ",".join(self.addresses) + '"'
@classmethod
def from_wire_parser(cls, parser, origin=None): # pylint: disable=W0613
@ -370,13 +377,13 @@ class ECHParam(Param):
@classmethod
def from_value(cls, value):
if '\\' in value:
raise ValueError('escape in ECH value')
if "\\" in value:
raise ValueError("escape in ECH value")
value = base64.b64decode(value.encode())
return cls(value)
def to_text(self):
b64 = base64.b64encode(self.ech).decode('ascii')
b64 = base64.b64encode(self.ech).decode("ascii")
return f'"{b64}"'
@classmethod
@ -407,7 +414,7 @@ def _validate_and_define(params, key, value):
emptiness = cls.emptiness()
if value is None:
if emptiness == Emptiness.NEVER:
raise SyntaxError('value cannot be empty')
raise SyntaxError("value cannot be empty")
value = cls.from_value(value)
else:
if force_generic:
@ -422,9 +429,9 @@ class SVCBBase(dns.rdata.Rdata):
"""Base class for SVCB-like records"""
# see: draft-ietf-dnsop-svcb-https-01
# see: draft-ietf-dnsop-svcb-https-11
__slots__ = ['priority', 'target', 'params']
__slots__ = ["priority", "target", "params"]
def __init__(self, rdclass, rdtype, priority, target, params):
super().__init__(rdclass, rdtype)
@ -433,7 +440,7 @@ class SVCBBase(dns.rdata.Rdata):
for k, v in params.items():
k = ParamKey.make(k)
if not isinstance(v, Param) and v is not None:
raise ValueError("not a Param")
raise ValueError(f"{k:d} not a Param")
self.params = dns.immutable.Dict(params)
# Make sure any parameter listed as mandatory is present in the
# record.
@ -443,12 +450,11 @@ class SVCBBase(dns.rdata.Rdata):
# Note we have to say "not in" as we have None as a value
# so a get() and a not None test would be wrong.
if key not in params:
raise ValueError(f'key {key:d} declared mandatory but not '
'present')
raise ValueError(f"key {key:d} declared mandatory but not present")
# The no-default-alpn parameter requires the alpn parameter.
if ParamKey.NO_DEFAULT_ALPN in params:
if ParamKey.ALPN not in params:
raise ValueError('no-default-alpn present, but alpn missing')
raise ValueError("no-default-alpn present, but alpn missing")
def to_text(self, origin=None, relativize=True, **kw):
target = self.target.choose_relativity(origin, relativize)
@ -458,23 +464,24 @@ class SVCBBase(dns.rdata.Rdata):
if value is None:
params.append(key_to_text(key))
else:
kv = key_to_text(key) + '=' + value.to_text()
kv = key_to_text(key) + "=" + value.to_text()
params.append(kv)
if len(params) > 0:
space = ' '
space = " "
else:
space = ''
return '%d %s%s%s' % (self.priority, target, space, ' '.join(params))
space = ""
return "%d %s%s%s" % (self.priority, target, space, " ".join(params))
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
priority = tok.get_uint16()
target = tok.get_name(origin, relativize, relativize_to)
if priority == 0:
token = tok.get()
if not token.is_eol_or_eof():
raise SyntaxError('parameters in AliasMode')
raise SyntaxError("parameters in AliasMode")
tok.unget(token)
params = {}
while True:
@ -483,20 +490,20 @@ class SVCBBase(dns.rdata.Rdata):
tok.unget(token)
break
if token.ttype != dns.tokenizer.IDENTIFIER:
raise SyntaxError('parameter is not an identifier')
equals = token.value.find('=')
raise SyntaxError("parameter is not an identifier")
equals = token.value.find("=")
if equals == len(token.value) - 1:
# 'key=', so next token should be a quoted string without
# any intervening whitespace.
key = token.value[:-1]
token = tok.get(want_leading=True)
if token.ttype != dns.tokenizer.QUOTED_STRING:
raise SyntaxError('whitespace after =')
raise SyntaxError("whitespace after =")
value = token.value
elif equals > 0:
# key=value
key = token.value[:equals]
value = token.value[equals + 1:]
value = token.value[equals + 1 :]
elif equals == 0:
# =key
raise SyntaxError('parameter cannot start with "="')
@ -532,13 +539,13 @@ class SVCBBase(dns.rdata.Rdata):
priority = parser.get_uint16()
target = parser.get_name(origin)
if priority == 0 and parser.remaining() != 0:
raise dns.exception.FormError('parameters in AliasMode')
raise dns.exception.FormError("parameters in AliasMode")
params = {}
prior_key = -1
while parser.remaining() > 0:
key = parser.get_uint16()
if key < prior_key:
raise dns.exception.FormError('keys not in order')
raise dns.exception.FormError("keys not in order")
prior_key = key
vlen = parser.get_uint16()
pcls = _class_for_key.get(key, GenericParam)

View file

@ -30,10 +30,9 @@ class TLSABase(dns.rdata.Rdata):
# see: RFC 6698
__slots__ = ['usage', 'selector', 'mtype', 'cert']
__slots__ = ["usage", "selector", "mtype", "cert"]
def __init__(self, rdclass, rdtype, usage, selector,
mtype, cert):
def __init__(self, rdclass, rdtype, usage, selector, mtype, cert):
super().__init__(rdclass, rdtype)
self.usage = self._as_uint8(usage)
self.selector = self._as_uint8(selector)
@ -42,17 +41,18 @@ class TLSABase(dns.rdata.Rdata):
def to_text(self, origin=None, relativize=True, **kw):
kw = kw.copy()
chunksize = kw.pop('chunksize', 128)
return '%d %d %d %s' % (self.usage,
self.selector,
self.mtype,
dns.rdata._hexify(self.cert,
chunksize=chunksize,
**kw))
chunksize = kw.pop("chunksize", 128)
return "%d %d %d %s" % (
self.usage,
self.selector,
self.mtype,
dns.rdata._hexify(self.cert, chunksize=chunksize, **kw),
)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
):
usage = tok.get_uint8()
selector = tok.get_uint8()
mtype = tok.get_uint8()

View file

@ -17,6 +17,8 @@
"""TXT-like base class."""
from typing import Any, Dict, Iterable, Optional, Tuple, Union
import struct
import dns.exception
@ -30,9 +32,14 @@ class TXTBase(dns.rdata.Rdata):
"""Base class for rdata that is like a TXT record (see RFC 1035)."""
__slots__ = ['strings']
__slots__ = ["strings"]
def __init__(self, rdclass, rdtype, strings):
def __init__(
self,
rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
strings: Iterable[Union[bytes, str]],
):
"""Initialize a TXT-like rdata.
*rdclass*, an ``int`` is the rdataclass of the Rdata.
@ -42,27 +49,41 @@ class TXTBase(dns.rdata.Rdata):
*strings*, a tuple of ``bytes``
"""
super().__init__(rdclass, rdtype)
self.strings = self._as_tuple(strings,
lambda x: self._as_bytes(x, True, 255))
self.strings: Tuple[bytes] = self._as_tuple(
strings, lambda x: self._as_bytes(x, True, 255)
)
def to_text(self, origin=None, relativize=True, **kw):
txt = ''
prefix = ''
def to_text(
self,
origin: Optional[dns.name.Name] = None,
relativize: bool = True,
**kw: Dict[str, Any]
) -> str:
txt = ""
prefix = ""
for s in self.strings:
txt += '{}"{}"'.format(prefix, dns.rdata._escapify(s))
prefix = ' '
prefix = " "
return txt
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls,
rdclass: dns.rdataclass.RdataClass,
rdtype: dns.rdatatype.RdataType,
tok: dns.tokenizer.Tokenizer,
origin: Optional[dns.name.Name] = None,
relativize: bool = True,
relativize_to: Optional[dns.name.Name] = None,
) -> dns.rdata.Rdata:
strings = []
for token in tok.get_remaining():
token = token.unescape_to_bytes()
# The 'if' below is always true in the current code, but we
# are leaving this check in in case things change some day.
if not (token.is_quoted_string() or
token.is_identifier()): # pragma: no cover
if not (
token.is_quoted_string() or token.is_identifier()
): # pragma: no cover
raise dns.exception.SyntaxError("expected a string")
if len(token.value) > 255:
raise dns.exception.SyntaxError("string too long")
@ -75,7 +96,7 @@ class TXTBase(dns.rdata.Rdata):
for s in self.strings:
l = len(s)
assert l < 256
file.write(struct.pack('!B', l))
file.write(struct.pack("!B", l))
file.write(s)
@classmethod

View file

@ -1,12 +0,0 @@
import typing
from .. import rdata
class TXTBase(rdata.Rdata):
strings: typing.Tuple[bytes, ...]
def __init__(self, rdclass: int, rdtype: int, strings: typing.Iterable[bytes]) -> None:
...
def to_text(self, origin: typing.Any, relativize: bool, **kw: typing.Any) -> str:
...
class TXT(TXTBase):
...

View file

@ -28,6 +28,7 @@ import dns.rdata
class Gateway:
"""A helper class for the IPSECKEY gateway and AMTRELAY relay fields"""
name = ""
def __init__(self, type, gateway=None):
@ -67,15 +68,17 @@ class Gateway:
raise ValueError(self._invalid_type(self.type)) # pragma: no cover
@classmethod
def from_text(cls, gateway_type, tok, origin=None, relativize=True,
relativize_to=None):
def from_text(
cls, gateway_type, tok, origin=None, relativize=True, relativize_to=None
):
if gateway_type in (0, 1, 2):
gateway = tok.get_string()
elif gateway_type == 3:
gateway = tok.get_name(origin, relativize, relativize_to)
else:
raise dns.exception.SyntaxError(
cls._invalid_type(gateway_type)) # pragma: no cover
cls._invalid_type(gateway_type)
) # pragma: no cover
return cls(gateway_type, gateway)
# pylint: disable=unused-argument
@ -90,6 +93,7 @@ class Gateway:
self.gateway.to_wire(file, None, origin, False)
else:
raise ValueError(self._invalid_type(self.type)) # pragma: no cover
# pylint: enable=unused-argument
@classmethod
@ -109,6 +113,7 @@ class Gateway:
class Bitmap:
"""A helper class for the NSEC/NSEC3/CSYNC type bitmaps"""
type_name = ""
def __init__(self, windows=None):
@ -136,7 +141,7 @@ class Bitmap:
if byte & (0x80 >> j):
rdtype = window * 256 + i * 8 + j
bits.append(dns.rdatatype.to_text(rdtype))
text += (' ' + ' '.join(bits))
text += " " + " ".join(bits)
return text
@classmethod
@ -151,7 +156,7 @@ class Bitmap:
window = 0
octets = 0
prior_rdtype = 0
bitmap = bytearray(b'\0' * 32)
bitmap = bytearray(b"\0" * 32)
windows = []
for rdtype in rdtypes:
if rdtype == prior_rdtype:
@ -161,7 +166,7 @@ class Bitmap:
if new_window != window:
if octets != 0:
windows.append((window, bytes(bitmap[0:octets])))
bitmap = bytearray(b'\0' * 32)
bitmap = bytearray(b"\0" * 32)
window = new_window
offset = rdtype % 256
byte = offset // 8
@ -174,7 +179,7 @@ class Bitmap:
def to_wire(self, file):
for (window, bitmap) in self.windows:
file.write(struct.pack('!BB', window, len(bitmap)))
file.write(struct.pack("!BB", window, len(bitmap)))
file.write(bitmap)
@classmethod
@ -193,6 +198,7 @@ def _priority_table(items):
by_priority[rdata._processing_priority()].append(rdata)
return by_priority
def priority_processing_order(iterable):
items = list(iterable)
if len(items) == 1:
@ -205,8 +211,10 @@ def priority_processing_order(iterable):
ordered.extend(rdatas)
return ordered
_no_weight = 0.1
def weighted_processing_order(iterable):
items = list(iterable)
if len(items) == 1:
@ -215,8 +223,7 @@ def weighted_processing_order(iterable):
ordered = []
for k in sorted(by_priority.keys()):
rdatas = by_priority[k]
total = sum(rdata._processing_weight() or _no_weight
for rdata in rdatas)
total = sum(rdata._processing_weight() or _no_weight for rdata in rdatas)
while len(rdatas) > 1:
r = random.uniform(0, total)
for (n, rdata) in enumerate(rdatas):
@ -230,15 +237,16 @@ def weighted_processing_order(iterable):
ordered.append(rdatas[0])
return ordered
def parse_formatted_hex(formatted, num_chunks, chunk_size, separator):
if len(formatted) != num_chunks * (chunk_size + 1) - 1:
raise ValueError('invalid formatted hex string')
value = b''
raise ValueError("invalid formatted hex string")
value = b""
for _ in range(num_chunks):
chunk = formatted[0:chunk_size]
value += int(chunk, 16).to_bytes(chunk_size // 2, 'big')
value += int(chunk, 16).to_bytes(chunk_size // 2, "big")
formatted = formatted[chunk_size:]
if len(formatted) > 0 and formatted[0] != separator:
raise ValueError('invalid formatted hex string')
raise ValueError("invalid formatted hex string")
formatted = formatted[1:]
return value