Bump dnspython from 2.0.0 to 2.2.0 (#1618)

* Bump dnspython from 2.0.0 to 2.2.0

Bumps [dnspython]() from 2.0.0 to 2.2.0.

---
updated-dependencies:
- dependency-name: dnspython
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Update dnspython==2.2.0

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com>

[skip ci]
This commit is contained in:
dependabot[bot] 2022-01-25 11:08:24 -08:00 committed by GitHub
parent 515a5d42d3
commit 3c93b5600f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
143 changed files with 7498 additions and 2054 deletions

View file

@ -16,8 +16,10 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.mxbase
import dns.immutable
@dns.immutable.immutable
class AFSDB(dns.rdtypes.mxbase.UncompressedDowncasingMX):
"""AFSDB record"""

View file

@ -18,12 +18,19 @@
import struct
import dns.exception
import dns.immutable
import dns.rdtypes.util
class Relay(dns.rdtypes.util.Gateway):
name = 'AMTRELAY relay'
@property
def relay(self):
return self.gateway
@dns.immutable.immutable
class AMTRELAY(dns.rdata.Rdata):
"""AMTRELAY record"""
@ -35,11 +42,11 @@ class AMTRELAY(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, precedence, discovery_optional,
relay_type, relay):
super().__init__(rdclass, rdtype)
Relay(relay_type, relay).check()
object.__setattr__(self, 'precedence', precedence)
object.__setattr__(self, 'discovery_optional', discovery_optional)
object.__setattr__(self, 'relay_type', relay_type)
object.__setattr__(self, 'relay', relay)
relay = Relay(relay_type, relay)
self.precedence = self._as_uint8(precedence)
self.discovery_optional = self._as_bool(discovery_optional)
self.relay_type = relay.type
self.relay = relay.relay
def to_text(self, origin=None, relativize=True, **kw):
relay = Relay(self.relay_type, self.relay).to_text(origin, relativize)
@ -57,10 +64,10 @@ class AMTRELAY(dns.rdata.Rdata):
relay_type = tok.get_uint8()
if relay_type > 0x7f:
raise dns.exception.SyntaxError('expecting an integer <= 127')
relay = Relay(relay_type).from_text(tok, origin, relativize,
relativize_to)
relay = Relay.from_text(relay_type, tok, origin, relativize,
relativize_to)
return cls(rdclass, rdtype, precedence, discovery_optional, relay_type,
relay)
relay.relay)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
relay_type = self.relay_type | (self.discovery_optional << 7)
@ -74,6 +81,6 @@ class AMTRELAY(dns.rdata.Rdata):
(precedence, relay_type) = parser.get_struct('!BB')
discovery_optional = bool(relay_type >> 7)
relay_type &= 0x7f
relay = Relay(relay_type).from_wire_parser(parser, origin)
relay = Relay.from_wire_parser(relay_type, parser, origin)
return cls(rdclass, rdtype, precedence, discovery_optional, relay_type,
relay)
relay.relay)

View file

@ -16,8 +16,10 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.txtbase
import dns.immutable
@dns.immutable.immutable
class AVC(dns.rdtypes.txtbase.TXTBase):
"""AVC record"""

View file

@ -18,10 +18,12 @@
import struct
import dns.exception
import dns.immutable
import dns.rdata
import dns.tokenizer
@dns.immutable.immutable
class CAA(dns.rdata.Rdata):
"""CAA (Certification Authority Authorization) record"""
@ -32,9 +34,11 @@ class CAA(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, flags, tag, value):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'flags', flags)
object.__setattr__(self, 'tag', tag)
object.__setattr__(self, 'value', value)
self.flags = self._as_uint8(flags)
self.tag = self._as_bytes(tag, True, 255)
if not tag.isalnum():
raise ValueError("tag is not alphanumeric")
self.value = self._as_bytes(value)
def to_text(self, origin=None, relativize=True, **kw):
return '%u %s "%s"' % (self.flags,
@ -46,10 +50,6 @@ class CAA(dns.rdata.Rdata):
relativize_to=None):
flags = tok.get_uint8()
tag = tok.get_string().encode()
if len(tag) > 255:
raise dns.exception.SyntaxError("tag too long")
if not tag.isalnum():
raise dns.exception.SyntaxError("tag is not alphanumeric")
value = tok.get_string().encode()
return cls(rdclass, rdtype, flags, tag, value)

View file

@ -16,9 +16,13 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.dnskeybase
import dns.immutable
# pylint: disable=unused-import
from dns.rdtypes.dnskeybase import SEP, REVOKE, ZONE # noqa: F401
# pylint: enable=unused-import
@dns.immutable.immutable
class CDNSKEY(dns.rdtypes.dnskeybase.DNSKEYBase):
"""CDNSKEY record"""

View file

@ -16,8 +16,15 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.dsbase
import dns.immutable
@dns.immutable.immutable
class CDS(dns.rdtypes.dsbase.DSBase):
"""CDS record"""
_digest_length_by_type = {
**dns.rdtypes.dsbase.DSBase._digest_length_by_type,
0: 1, # delete, RFC 8078 Sec. 4 (including Errata ID 5049)
}

View file

@ -19,6 +19,7 @@ import struct
import base64
import dns.exception
import dns.immutable
import dns.dnssec
import dns.rdata
import dns.tokenizer
@ -27,6 +28,11 @@ _ctype_by_value = {
1: 'PKIX',
2: 'SPKI',
3: 'PGP',
4: 'IPKIX',
5: 'ISPKI',
6: 'IPGP',
7: 'ACPKIX',
8: 'IACPKIX',
253: 'URI',
254: 'OID',
}
@ -35,6 +41,11 @@ _ctype_by_name = {
'PKIX': 1,
'SPKI': 2,
'PGP': 3,
'IPKIX': 4,
'ISPKI': 5,
'IPGP': 6,
'ACPKIX': 7,
'IACPKIX': 8,
'URI': 253,
'OID': 254,
}
@ -54,27 +65,28 @@ def _ctype_to_text(what):
return str(what)
@dns.immutable.immutable
class CERT(dns.rdata.Rdata):
"""CERT record"""
# see RFC 2538
# see RFC 4398
__slots__ = ['certificate_type', 'key_tag', 'algorithm', 'certificate']
def __init__(self, rdclass, rdtype, certificate_type, key_tag, algorithm,
certificate):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'certificate_type', certificate_type)
object.__setattr__(self, 'key_tag', key_tag)
object.__setattr__(self, 'algorithm', algorithm)
object.__setattr__(self, 'certificate', certificate)
self.certificate_type = self._as_uint16(certificate_type)
self.key_tag = self._as_uint16(key_tag)
self.algorithm = self._as_uint8(algorithm)
self.certificate = self._as_bytes(certificate)
def to_text(self, origin=None, relativize=True, **kw):
certificate_type = _ctype_to_text(self.certificate_type)
return "%s %d %s %s" % (certificate_type, self.key_tag,
dns.dnssec.algorithm_to_text(self.algorithm),
dns.rdata._base64ify(self.certificate))
dns.rdata._base64ify(self.certificate, **kw))
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
@ -82,8 +94,6 @@ class CERT(dns.rdata.Rdata):
certificate_type = _ctype_from_text(tok.get_string())
key_tag = tok.get_uint16()
algorithm = dns.dnssec.algorithm_from_text(tok.get_string())
if algorithm < 0 or algorithm > 255:
raise dns.exception.SyntaxError("bad algorithm type")
b64 = tok.concatenate_remaining_identifiers().encode()
certificate = base64.b64decode(b64)
return cls(rdclass, rdtype, certificate_type, key_tag,

View file

@ -16,8 +16,10 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.nsbase
import dns.immutable
@dns.immutable.immutable
class CNAME(dns.rdtypes.nsbase.NSBase):
"""CNAME record

View file

@ -18,16 +18,19 @@
import struct
import dns.exception
import dns.immutable
import dns.rdata
import dns.rdatatype
import dns.name
import dns.rdtypes.util
@dns.immutable.immutable
class Bitmap(dns.rdtypes.util.Bitmap):
type_name = 'CSYNC'
@dns.immutable.immutable
class CSYNC(dns.rdata.Rdata):
"""CSYNC record"""
@ -36,9 +39,11 @@ class CSYNC(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, serial, flags, windows):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'serial', serial)
object.__setattr__(self, 'flags', flags)
object.__setattr__(self, 'windows', dns.rdata._constify(windows))
self.serial = self._as_uint32(serial)
self.flags = self._as_uint16(flags)
if not isinstance(windows, Bitmap):
windows = Bitmap(windows)
self.windows = tuple(windows.windows)
def to_text(self, origin=None, relativize=True, **kw):
text = Bitmap(self.windows).to_text()
@ -49,8 +54,8 @@ class CSYNC(dns.rdata.Rdata):
relativize_to=None):
serial = tok.get_uint32()
flags = tok.get_uint16()
windows = Bitmap().from_text(tok)
return cls(rdclass, rdtype, serial, flags, windows)
bitmap = Bitmap.from_text(tok)
return cls(rdclass, rdtype, serial, flags, bitmap)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
file.write(struct.pack('!IH', self.serial, self.flags))
@ -59,5 +64,5 @@ class CSYNC(dns.rdata.Rdata):
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
(serial, flags) = parser.get_struct("!IH")
windows = Bitmap().from_wire_parser(parser)
return cls(rdclass, rdtype, serial, flags, windows)
bitmap = Bitmap.from_wire_parser(parser)
return cls(rdclass, rdtype, serial, flags, bitmap)

View file

@ -16,8 +16,10 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.dsbase
import dns.immutable
@dns.immutable.immutable
class DLV(dns.rdtypes.dsbase.DSBase):
"""DLV record"""

View file

@ -16,8 +16,10 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.nsbase
import dns.immutable
@dns.immutable.immutable
class DNAME(dns.rdtypes.nsbase.UncompressedNS):
"""DNAME record"""

View file

@ -16,9 +16,13 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.dnskeybase
import dns.immutable
# pylint: disable=unused-import
from dns.rdtypes.dnskeybase import SEP, REVOKE, ZONE # noqa: F401
# pylint: enable=unused-import
@dns.immutable.immutable
class DNSKEY(dns.rdtypes.dnskeybase.DNSKEYBase):
"""DNSKEY record"""

View file

@ -16,8 +16,10 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.dsbase
import dns.immutable
@dns.immutable.immutable
class DS(dns.rdtypes.dsbase.DSBase):
"""DS record"""

View file

@ -17,8 +17,10 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.euibase
import dns.immutable
@dns.immutable.immutable
class EUI48(dns.rdtypes.euibase.EUIBase):
"""EUI48 record"""

View file

@ -17,8 +17,10 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.euibase
import dns.immutable
@dns.immutable.immutable
class EUI64(dns.rdtypes.euibase.EUIBase):
"""EUI64 record"""

View file

@ -18,6 +18,7 @@
import struct
import dns.exception
import dns.immutable
import dns.rdata
import dns.tokenizer
@ -41,12 +42,7 @@ def _validate_float_string(what):
raise dns.exception.FormError
def _sanitize(value):
if isinstance(value, str):
return value.encode()
return value
@dns.immutable.immutable
class GPOS(dns.rdata.Rdata):
"""GPOS record"""
@ -66,15 +62,15 @@ class GPOS(dns.rdata.Rdata):
if isinstance(altitude, float) or \
isinstance(altitude, int):
altitude = str(altitude)
latitude = _sanitize(latitude)
longitude = _sanitize(longitude)
altitude = _sanitize(altitude)
latitude = self._as_bytes(latitude, True, 255)
longitude = self._as_bytes(longitude, True, 255)
altitude = self._as_bytes(altitude, True, 255)
_validate_float_string(latitude)
_validate_float_string(longitude)
_validate_float_string(altitude)
object.__setattr__(self, 'latitude', latitude)
object.__setattr__(self, 'longitude', longitude)
object.__setattr__(self, 'altitude', altitude)
self.latitude = latitude
self.longitude = longitude
self.altitude = altitude
flat = self.float_latitude
if flat < -90.0 or flat > 90.0:
raise dns.exception.FormError('bad latitude')
@ -93,7 +89,6 @@ class GPOS(dns.rdata.Rdata):
latitude = tok.get_string()
longitude = tok.get_string()
altitude = tok.get_string()
tok.get_eol()
return cls(rdclass, rdtype, latitude, longitude, altitude)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):

View file

@ -18,10 +18,12 @@
import struct
import dns.exception
import dns.immutable
import dns.rdata
import dns.tokenizer
@dns.immutable.immutable
class HINFO(dns.rdata.Rdata):
"""HINFO record"""
@ -32,14 +34,8 @@ class HINFO(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, cpu, os):
super().__init__(rdclass, rdtype)
if isinstance(cpu, str):
object.__setattr__(self, 'cpu', cpu.encode())
else:
object.__setattr__(self, 'cpu', cpu)
if isinstance(os, str):
object.__setattr__(self, 'os', os.encode())
else:
object.__setattr__(self, 'os', os)
self.cpu = self._as_bytes(cpu, True, 255)
self.os = self._as_bytes(os, True, 255)
def to_text(self, origin=None, relativize=True, **kw):
return '"{}" "{}"'.format(dns.rdata._escapify(self.cpu),
@ -50,7 +46,6 @@ class HINFO(dns.rdata.Rdata):
relativize_to=None):
cpu = tok.get_string(max_length=255)
os = tok.get_string(max_length=255)
tok.get_eol()
return cls(rdclass, rdtype, cpu, os)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):

View file

@ -20,10 +20,12 @@ import base64
import binascii
import dns.exception
import dns.immutable
import dns.rdata
import dns.rdatatype
@dns.immutable.immutable
class HIP(dns.rdata.Rdata):
"""HIP record"""
@ -34,10 +36,10 @@ class HIP(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, hit, algorithm, key, servers):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'hit', hit)
object.__setattr__(self, 'algorithm', algorithm)
object.__setattr__(self, 'key', key)
object.__setattr__(self, 'servers', dns.rdata._constify(servers))
self.hit = self._as_bytes(hit, True, 255)
self.algorithm = self._as_uint8(algorithm)
self.key = self._as_bytes(key, True)
self.servers = self._as_tuple(servers, self._as_name)
def to_text(self, origin=None, relativize=True, **kw):
hit = binascii.hexlify(self.hit).decode()
@ -55,14 +57,9 @@ class HIP(dns.rdata.Rdata):
relativize_to=None):
algorithm = tok.get_uint8()
hit = binascii.unhexlify(tok.get_string().encode())
if len(hit) > 255:
raise dns.exception.SyntaxError("HIT too long")
key = base64.b64decode(tok.get_string().encode())
servers = []
while 1:
token = tok.get()
if token.is_eol_or_eof():
break
for token in tok.get_remaining():
server = tok.as_name(token, origin, relativize, relativize_to)
servers.append(server)
return cls(rdclass, rdtype, hit, algorithm, key, servers)

View file

@ -18,10 +18,12 @@
import struct
import dns.exception
import dns.immutable
import dns.rdata
import dns.tokenizer
@dns.immutable.immutable
class ISDN(dns.rdata.Rdata):
"""ISDN record"""
@ -32,14 +34,8 @@ class ISDN(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, address, subaddress):
super().__init__(rdclass, rdtype)
if isinstance(address, str):
object.__setattr__(self, 'address', address.encode())
else:
object.__setattr__(self, 'address', address)
if isinstance(address, str):
object.__setattr__(self, 'subaddress', subaddress.encode())
else:
object.__setattr__(self, 'subaddress', subaddress)
self.address = self._as_bytes(address, True, 255)
self.subaddress = self._as_bytes(subaddress, True, 255)
def to_text(self, origin=None, relativize=True, **kw):
if self.subaddress:
@ -52,14 +48,11 @@ class ISDN(dns.rdata.Rdata):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
address = tok.get_string()
t = tok.get()
if not t.is_eol_or_eof():
tok.unget(t)
subaddress = tok.get_string()
tokens = tok.get_remaining(max_tokens=1)
if len(tokens) >= 1:
subaddress = tokens[0].unescape().value
else:
tok.unget(t)
subaddress = ''
tok.get_eol()
return cls(rdclass, rdtype, address, subaddress)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):

View file

@ -0,0 +1,40 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
import struct
import dns.immutable
@dns.immutable.immutable
class L32(dns.rdata.Rdata):
"""L32 record"""
# see: rfc6742.txt
__slots__ = ['preference', 'locator32']
def __init__(self, rdclass, rdtype, preference, locator32):
super().__init__(rdclass, rdtype)
self.preference = self._as_uint16(preference)
self.locator32 = self._as_ipv4_address(locator32)
def to_text(self, origin=None, relativize=True, **kw):
return f'{self.preference} {self.locator32}'
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
preference = tok.get_uint16()
nodeid = tok.get_identifier()
return cls(rdclass, rdtype, preference, nodeid)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
file.write(struct.pack('!H', self.preference))
file.write(dns.ipv4.inet_aton(self.locator32))
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
preference = parser.get_uint16()
locator32 = parser.get_remaining()
return cls(rdclass, rdtype, preference, locator32)

View file

@ -0,0 +1,48 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
import struct
import dns.immutable
import dns.rdtypes.util
@dns.immutable.immutable
class L64(dns.rdata.Rdata):
"""L64 record"""
# see: rfc6742.txt
__slots__ = ['preference', 'locator64']
def __init__(self, rdclass, rdtype, preference, locator64):
super().__init__(rdclass, rdtype)
self.preference = self._as_uint16(preference)
if isinstance(locator64, bytes):
if len(locator64) != 8:
raise ValueError('invalid locator64')
self.locator64 = dns.rdata._hexify(locator64, 4, b':')
else:
dns.rdtypes.util.parse_formatted_hex(locator64, 4, 4, ':')
self.locator64 = locator64
def to_text(self, origin=None, relativize=True, **kw):
return f'{self.preference} {self.locator64}'
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
preference = tok.get_uint16()
locator64 = tok.get_identifier()
return cls(rdclass, rdtype, preference, locator64)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
file.write(struct.pack('!H', self.preference))
file.write(dns.rdtypes.util.parse_formatted_hex(self.locator64,
4, 4, ':'))
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
preference = parser.get_uint16()
locator64 = parser.get_remaining()
return cls(rdclass, rdtype, preference, locator64)

View file

@ -18,6 +18,7 @@
import struct
import dns.exception
import dns.immutable
import dns.rdata
@ -34,17 +35,13 @@ _MIN_LATITUDE = 0x80000000 - 90 * 3600000
_MAX_LONGITUDE = 0x80000000 + 180 * 3600000
_MIN_LONGITUDE = 0x80000000 - 180 * 3600000
# pylint complains about division since we don't have a from __future__ for
# it, but we don't care about python 2 warnings, so turn them off.
#
# pylint: disable=old-division
def _exponent_of(what, desc):
if what == 0:
return 0
exp = None
for (i, pow) in enumerate(_pows):
if what // pow == 0:
if what < pow:
exp = i - 1
break
if exp is None or exp < 0:
@ -58,7 +55,7 @@ def _float_to_tuple(what):
what *= -1
else:
sign = 1
what = round(what * 3600000) # pylint: disable=round-builtin
what = round(what * 3600000)
degrees = int(what // 3600000)
what -= degrees * 3600000
minutes = int(what // 60000)
@ -94,6 +91,20 @@ def _decode_size(what, desc):
return base * pow(10, exponent)
def _check_coordinate_list(value, low, high):
if value[0] < low or value[0] > high:
raise ValueError(f'not in range [{low}, {high}]')
if value[1] < 0 or value[1] > 59:
raise ValueError('bad minutes value')
if value[2] < 0 or value[2] > 59:
raise ValueError('bad seconds value')
if value[3] < 0 or value[3] > 999:
raise ValueError('bad milliseconds value')
if value[4] != 1 and value[4] != -1:
raise ValueError('bad hemisphere value')
@dns.immutable.immutable
class LOC(dns.rdata.Rdata):
"""LOC record"""
@ -119,16 +130,18 @@ class LOC(dns.rdata.Rdata):
latitude = float(latitude)
if isinstance(latitude, float):
latitude = _float_to_tuple(latitude)
object.__setattr__(self, 'latitude', dns.rdata._constify(latitude))
_check_coordinate_list(latitude, -90, 90)
self.latitude = tuple(latitude)
if isinstance(longitude, int):
longitude = float(longitude)
if isinstance(longitude, float):
longitude = _float_to_tuple(longitude)
object.__setattr__(self, 'longitude', dns.rdata._constify(longitude))
object.__setattr__(self, 'altitude', float(altitude))
object.__setattr__(self, 'size', float(size))
object.__setattr__(self, 'horizontal_precision', float(hprec))
object.__setattr__(self, 'vertical_precision', float(vprec))
_check_coordinate_list(longitude, -180, 180)
self.longitude = tuple(longitude)
self.altitude = float(altitude)
self.size = float(size)
self.horizontal_precision = float(hprec)
self.vertical_precision = float(vprec)
def to_text(self, origin=None, relativize=True, **kw):
if self.latitude[4] > 0:
@ -167,13 +180,9 @@ class LOC(dns.rdata.Rdata):
vprec = _default_vprec
latitude[0] = tok.get_int()
if latitude[0] > 90:
raise dns.exception.SyntaxError('latitude >= 90')
t = tok.get_string()
if t.isdigit():
latitude[1] = int(t)
if latitude[1] >= 60:
raise dns.exception.SyntaxError('latitude minutes >= 60')
t = tok.get_string()
if '.' in t:
(seconds, milliseconds) = t.split('.')
@ -181,8 +190,6 @@ class LOC(dns.rdata.Rdata):
raise dns.exception.SyntaxError(
'bad latitude seconds value')
latitude[2] = int(seconds)
if latitude[2] >= 60:
raise dns.exception.SyntaxError('latitude seconds >= 60')
l = len(milliseconds)
if l == 0 or l > 3 or not milliseconds.isdigit():
raise dns.exception.SyntaxError(
@ -204,13 +211,9 @@ class LOC(dns.rdata.Rdata):
raise dns.exception.SyntaxError('bad latitude hemisphere value')
longitude[0] = tok.get_int()
if longitude[0] > 180:
raise dns.exception.SyntaxError('longitude > 180')
t = tok.get_string()
if t.isdigit():
longitude[1] = int(t)
if longitude[1] >= 60:
raise dns.exception.SyntaxError('longitude minutes >= 60')
t = tok.get_string()
if '.' in t:
(seconds, milliseconds) = t.split('.')
@ -218,8 +221,6 @@ class LOC(dns.rdata.Rdata):
raise dns.exception.SyntaxError(
'bad longitude seconds value')
longitude[2] = int(seconds)
if longitude[2] >= 60:
raise dns.exception.SyntaxError('longitude seconds >= 60')
l = len(milliseconds)
if l == 0 or l > 3 or not milliseconds.isdigit():
raise dns.exception.SyntaxError(
@ -245,25 +246,22 @@ class LOC(dns.rdata.Rdata):
t = t[0: -1]
altitude = float(t) * 100.0 # m -> cm
token = tok.get().unescape()
if not token.is_eol_or_eof():
value = token.value
tokens = tok.get_remaining(max_tokens=3)
if len(tokens) >= 1:
value = tokens[0].unescape().value
if value[-1] == 'm':
value = value[0: -1]
size = float(value) * 100.0 # m -> cm
token = tok.get().unescape()
if not token.is_eol_or_eof():
value = token.value
if len(tokens) >= 2:
value = tokens[1].unescape().value
if value[-1] == 'm':
value = value[0: -1]
hprec = float(value) * 100.0 # m -> cm
token = tok.get().unescape()
if not token.is_eol_or_eof():
value = token.value
if len(tokens) >= 3:
value = tokens[2].unescape().value
if value[-1] == 'm':
value = value[0: -1]
vprec = float(value) * 100.0 # m -> cm
tok.get_eol()
# Try encoding these now so we raise if they are bad
_encode_size(size, "size")
@ -296,6 +294,8 @@ class LOC(dns.rdata.Rdata):
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
(version, size, hprec, vprec, latitude, longitude, altitude) = \
parser.get_struct("!BBBBIII")
if version != 0:
raise dns.exception.FormError("LOC version not zero")
if latitude < _MIN_LATITUDE or latitude > _MAX_LATITUDE:
raise dns.exception.FormError("bad latitude")
if latitude > 0x80000000:

41
lib/dns/rdtypes/ANY/LP.py Normal file
View file

@ -0,0 +1,41 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
import struct
import dns.immutable
@dns.immutable.immutable
class LP(dns.rdata.Rdata):
"""LP record"""
# see: rfc6742.txt
__slots__ = ['preference', 'fqdn']
def __init__(self, rdclass, rdtype, preference, fqdn):
super().__init__(rdclass, rdtype)
self.preference = self._as_uint16(preference)
self.fqdn = self._as_name(fqdn)
def to_text(self, origin=None, relativize=True, **kw):
fqdn = self.fqdn.choose_relativity(origin, relativize)
return '%d %s' % (self.preference, fqdn)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
preference = tok.get_uint16()
fqdn = tok.get_name(origin, relativize, relativize_to)
return cls(rdclass, rdtype, preference, fqdn)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
file.write(struct.pack('!H', self.preference))
self.fqdn.to_wire(file, compress, origin, canonicalize)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
preference = parser.get_uint16()
fqdn = parser.get_name(origin)
return cls(rdclass, rdtype, preference, fqdn)

View file

@ -16,8 +16,10 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.mxbase
import dns.immutable
@dns.immutable.immutable
class MX(dns.rdtypes.mxbase.MXBase):
"""MX record"""

View file

@ -0,0 +1,47 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
import struct
import dns.immutable
import dns.rdtypes.util
@dns.immutable.immutable
class NID(dns.rdata.Rdata):
"""NID record"""
# see: rfc6742.txt
__slots__ = ['preference', 'nodeid']
def __init__(self, rdclass, rdtype, preference, nodeid):
super().__init__(rdclass, rdtype)
self.preference = self._as_uint16(preference)
if isinstance(nodeid, bytes):
if len(nodeid) != 8:
raise ValueError('invalid nodeid')
self.nodeid = dns.rdata._hexify(nodeid, 4, b':')
else:
dns.rdtypes.util.parse_formatted_hex(nodeid, 4, 4, ':')
self.nodeid = nodeid
def to_text(self, origin=None, relativize=True, **kw):
return f'{self.preference} {self.nodeid}'
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
preference = tok.get_uint16()
nodeid = tok.get_identifier()
return cls(rdclass, rdtype, preference, nodeid)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
file.write(struct.pack('!H', self.preference))
file.write(dns.rdtypes.util.parse_formatted_hex(self.nodeid, 4, 4, ':'))
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
preference = parser.get_uint16()
nodeid = parser.get_remaining()
return cls(rdclass, rdtype, preference, nodeid)

View file

@ -16,8 +16,10 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.txtbase
import dns.immutable
@dns.immutable.immutable
class NINFO(dns.rdtypes.txtbase.TXTBase):
"""NINFO record"""

View file

@ -16,8 +16,10 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.nsbase
import dns.immutable
@dns.immutable.immutable
class NS(dns.rdtypes.nsbase.NSBase):
"""NS record"""

View file

@ -16,16 +16,19 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.exception
import dns.immutable
import dns.rdata
import dns.rdatatype
import dns.name
import dns.rdtypes.util
@dns.immutable.immutable
class Bitmap(dns.rdtypes.util.Bitmap):
type_name = 'NSEC'
@dns.immutable.immutable
class NSEC(dns.rdata.Rdata):
"""NSEC record"""
@ -34,8 +37,10 @@ class NSEC(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, next, windows):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'next', next)
object.__setattr__(self, 'windows', dns.rdata._constify(windows))
self.next = self._as_name(next)
if not isinstance(windows, Bitmap):
windows = Bitmap(windows)
self.windows = tuple(windows.windows)
def to_text(self, origin=None, relativize=True, **kw):
next = self.next.choose_relativity(origin, relativize)
@ -46,15 +51,17 @@ class NSEC(dns.rdata.Rdata):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
next = tok.get_name(origin, relativize, relativize_to)
windows = Bitmap().from_text(tok)
windows = Bitmap.from_text(tok)
return cls(rdclass, rdtype, next, windows)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
# Note that NSEC downcasing, originally mandated by RFC 4034
# section 6.2 was removed by RFC 6840 section 5.1.
self.next.to_wire(file, None, origin, False)
Bitmap(self.windows).to_wire(file)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
next = parser.get_name(origin)
windows = Bitmap().from_wire_parser(parser)
return cls(rdclass, rdtype, next, windows)
bitmap = Bitmap.from_wire_parser(parser)
return cls(rdclass, rdtype, next, bitmap)

View file

@ -20,6 +20,7 @@ import binascii
import struct
import dns.exception
import dns.immutable
import dns.rdata
import dns.rdatatype
import dns.rdtypes.util
@ -37,10 +38,12 @@ SHA1 = 1
OPTOUT = 1
@dns.immutable.immutable
class Bitmap(dns.rdtypes.util.Bitmap):
type_name = 'NSEC3'
@dns.immutable.immutable
class NSEC3(dns.rdata.Rdata):
"""NSEC3 record"""
@ -50,15 +53,14 @@ class NSEC3(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, algorithm, flags, iterations, salt,
next, windows):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'algorithm', algorithm)
object.__setattr__(self, 'flags', flags)
object.__setattr__(self, 'iterations', iterations)
if isinstance(salt, str):
object.__setattr__(self, 'salt', salt.encode())
else:
object.__setattr__(self, 'salt', salt)
object.__setattr__(self, 'next', next)
object.__setattr__(self, 'windows', dns.rdata._constify(windows))
self.algorithm = self._as_uint8(algorithm)
self.flags = self._as_uint8(flags)
self.iterations = self._as_uint16(iterations)
self.salt = self._as_bytes(salt, True, 255)
self.next = self._as_bytes(next, True, 255)
if not isinstance(windows, Bitmap):
windows = Bitmap(windows)
self.windows = tuple(windows.windows)
def to_text(self, origin=None, relativize=True, **kw):
next = base64.b32encode(self.next).translate(
@ -85,9 +87,9 @@ class NSEC3(dns.rdata.Rdata):
next = tok.get_string().encode(
'ascii').upper().translate(b32_hex_to_normal)
next = base64.b32decode(next)
windows = Bitmap().from_text(tok)
bitmap = Bitmap.from_text(tok)
return cls(rdclass, rdtype, algorithm, flags, iterations, salt, next,
windows)
bitmap)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
l = len(self.salt)
@ -104,6 +106,6 @@ class NSEC3(dns.rdata.Rdata):
(algorithm, flags, iterations) = parser.get_struct('!BBH')
salt = parser.get_counted_bytes()
next = parser.get_counted_bytes()
windows = Bitmap().from_wire_parser(parser)
bitmap = Bitmap.from_wire_parser(parser)
return cls(rdclass, rdtype, algorithm, flags, iterations, salt, next,
windows)
bitmap)

View file

@ -19,9 +19,11 @@ import struct
import binascii
import dns.exception
import dns.immutable
import dns.rdata
@dns.immutable.immutable
class NSEC3PARAM(dns.rdata.Rdata):
"""NSEC3PARAM record"""
@ -30,13 +32,10 @@ class NSEC3PARAM(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, algorithm, flags, iterations, salt):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'algorithm', algorithm)
object.__setattr__(self, 'flags', flags)
object.__setattr__(self, 'iterations', iterations)
if isinstance(salt, str):
object.__setattr__(self, 'salt', salt.encode())
else:
object.__setattr__(self, 'salt', salt)
self.algorithm = self._as_uint8(algorithm)
self.flags = self._as_uint8(flags)
self.iterations = self._as_uint16(iterations)
self.salt = self._as_bytes(salt, True, 255)
def to_text(self, origin=None, relativize=True, **kw):
if self.salt == b'':
@ -57,7 +56,6 @@ class NSEC3PARAM(dns.rdata.Rdata):
salt = ''
else:
salt = binascii.unhexlify(salt.encode())
tok.get_eol()
return cls(rdclass, rdtype, algorithm, flags, iterations, salt)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):

View file

@ -18,9 +18,11 @@
import base64
import dns.exception
import dns.immutable
import dns.rdata
import dns.tokenizer
@dns.immutable.immutable
class OPENPGPKEY(dns.rdata.Rdata):
"""OPENPGPKEY record"""
@ -29,10 +31,10 @@ class OPENPGPKEY(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, key):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'key', key)
self.key = self._as_bytes(key)
def to_text(self, origin=None, relativize=True, **kw):
return dns.rdata._base64ify(self.key)
return dns.rdata._base64ify(self.key, chunksize=None, **kw)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,

View file

@ -18,10 +18,15 @@
import struct
import dns.edns
import dns.immutable
import dns.exception
import dns.rdata
# We don't implement from_text, and that's ok.
# pylint: disable=abstract-method
@dns.immutable.immutable
class OPT(dns.rdata.Rdata):
"""OPT record"""
@ -40,7 +45,11 @@ class OPT(dns.rdata.Rdata):
"""
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'options', dns.rdata._constify(options))
def as_option(option):
if not isinstance(option, dns.edns.Option):
raise ValueError('option is not a dns.edns.option')
return option
self.options = self._as_tuple(options, as_option)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
for opt in self.options:

View file

@ -16,8 +16,10 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.nsbase
import dns.immutable
@dns.immutable.immutable
class PTR(dns.rdtypes.nsbase.NSBase):
"""PTR record"""

View file

@ -16,10 +16,12 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.exception
import dns.immutable
import dns.rdata
import dns.name
@dns.immutable.immutable
class RP(dns.rdata.Rdata):
"""RP record"""
@ -30,8 +32,8 @@ class RP(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, mbox, txt):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'mbox', mbox)
object.__setattr__(self, 'txt', txt)
self.mbox = self._as_name(mbox)
self.txt = self._as_name(txt)
def to_text(self, origin=None, relativize=True, **kw):
mbox = self.mbox.choose_relativity(origin, relativize)
@ -43,7 +45,6 @@ class RP(dns.rdata.Rdata):
relativize_to=None):
mbox = tok.get_name(origin, relativize, relativize_to)
txt = tok.get_name(origin, relativize, relativize_to)
tok.get_eol()
return cls(rdclass, rdtype, mbox, txt)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):

View file

@ -21,6 +21,7 @@ import struct
import time
import dns.dnssec
import dns.immutable
import dns.exception
import dns.rdata
import dns.rdatatype
@ -50,6 +51,7 @@ def posixtime_to_sigtime(what):
return time.strftime('%Y%m%d%H%M%S', time.gmtime(what))
@dns.immutable.immutable
class RRSIG(dns.rdata.Rdata):
"""RRSIG record"""
@ -62,15 +64,15 @@ class RRSIG(dns.rdata.Rdata):
original_ttl, expiration, inception, key_tag, signer,
signature):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'type_covered', type_covered)
object.__setattr__(self, 'algorithm', algorithm)
object.__setattr__(self, 'labels', labels)
object.__setattr__(self, 'original_ttl', original_ttl)
object.__setattr__(self, 'expiration', expiration)
object.__setattr__(self, 'inception', inception)
object.__setattr__(self, 'key_tag', key_tag)
object.__setattr__(self, 'signer', signer)
object.__setattr__(self, 'signature', signature)
self.type_covered = self._as_rdatatype(type_covered)
self.algorithm = dns.dnssec.Algorithm.make(algorithm)
self.labels = self._as_uint8(labels)
self.original_ttl = self._as_ttl(original_ttl)
self.expiration = self._as_uint32(expiration)
self.inception = self._as_uint32(inception)
self.key_tag = self._as_uint16(key_tag)
self.signer = self._as_name(signer)
self.signature = self._as_bytes(signature)
def covers(self):
return self.type_covered
@ -85,7 +87,7 @@ class RRSIG(dns.rdata.Rdata):
posixtime_to_sigtime(self.inception),
self.key_tag,
self.signer.choose_relativity(origin, relativize),
dns.rdata._base64ify(self.signature)
dns.rdata._base64ify(self.signature, **kw)
)
@classmethod

View file

@ -16,8 +16,10 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.mxbase
import dns.immutable
@dns.immutable.immutable
class RT(dns.rdtypes.mxbase.UncompressedDowncasingMX):
"""RT record"""

View file

@ -0,0 +1,9 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
import dns.immutable
import dns.rdtypes.tlsabase
@dns.immutable.immutable
class SMIMEA(dns.rdtypes.tlsabase.TLSABase):
"""SMIMEA record"""

View file

@ -18,10 +18,12 @@
import struct
import dns.exception
import dns.immutable
import dns.rdata
import dns.name
@dns.immutable.immutable
class SOA(dns.rdata.Rdata):
"""SOA record"""
@ -34,13 +36,13 @@ class SOA(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, mname, rname, serial, refresh, retry,
expire, minimum):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'mname', mname)
object.__setattr__(self, 'rname', rname)
object.__setattr__(self, 'serial', serial)
object.__setattr__(self, 'refresh', refresh)
object.__setattr__(self, 'retry', retry)
object.__setattr__(self, 'expire', expire)
object.__setattr__(self, 'minimum', minimum)
self.mname = self._as_name(mname)
self.rname = self._as_name(rname)
self.serial = self._as_uint32(serial)
self.refresh = self._as_ttl(refresh)
self.retry = self._as_ttl(retry)
self.expire = self._as_ttl(expire)
self.minimum = self._as_ttl(minimum)
def to_text(self, origin=None, relativize=True, **kw):
mname = self.mname.choose_relativity(origin, relativize)
@ -59,7 +61,6 @@ class SOA(dns.rdata.Rdata):
retry = tok.get_ttl()
expire = tok.get_ttl()
minimum = tok.get_ttl()
tok.get_eol()
return cls(rdclass, rdtype, mname, rname, serial, refresh, retry,
expire, minimum)

View file

@ -16,8 +16,10 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.txtbase
import dns.immutable
@dns.immutable.immutable
class SPF(dns.rdtypes.txtbase.TXTBase):
"""SPF record"""

View file

@ -19,9 +19,11 @@ import struct
import binascii
import dns.rdata
import dns.immutable
import dns.rdatatype
@dns.immutable.immutable
class SSHFP(dns.rdata.Rdata):
"""SSHFP record"""
@ -33,15 +35,18 @@ class SSHFP(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, algorithm, fp_type,
fingerprint):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'algorithm', algorithm)
object.__setattr__(self, 'fp_type', fp_type)
object.__setattr__(self, 'fingerprint', fingerprint)
self.algorithm = self._as_uint8(algorithm)
self.fp_type = self._as_uint8(fp_type)
self.fingerprint = self._as_bytes(fingerprint, True)
def to_text(self, origin=None, relativize=True, **kw):
kw = kw.copy()
chunksize = kw.pop('chunksize', 128)
return '%d %d %s' % (self.algorithm,
self.fp_type,
dns.rdata._hexify(self.fingerprint,
chunksize=128))
chunksize=chunksize,
**kw))
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,

118
lib/dns/rdtypes/ANY/TKEY.py Normal file
View file

@ -0,0 +1,118 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2004-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
# provided that the above copyright notice and this permission notice
# appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import base64
import struct
import dns.dnssec
import dns.immutable
import dns.exception
import dns.rdata
@dns.immutable.immutable
class TKEY(dns.rdata.Rdata):
"""TKEY Record"""
__slots__ = ['algorithm', 'inception', 'expiration', 'mode', 'error',
'key', 'other']
def __init__(self, rdclass, rdtype, algorithm, inception, expiration,
mode, error, key, other=b''):
super().__init__(rdclass, rdtype)
self.algorithm = self._as_name(algorithm)
self.inception = self._as_uint32(inception)
self.expiration = self._as_uint32(expiration)
self.mode = self._as_uint16(mode)
self.error = self._as_uint16(error)
self.key = self._as_bytes(key)
self.other = self._as_bytes(other)
def to_text(self, origin=None, relativize=True, **kw):
_algorithm = self.algorithm.choose_relativity(origin, relativize)
text = '%s %u %u %u %u %s' % (str(_algorithm), self.inception,
self.expiration, self.mode, self.error,
dns.rdata._base64ify(self.key, 0))
if len(self.other) > 0:
text += ' %s' % (dns.rdata._base64ify(self.other, 0))
return text
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
algorithm = tok.get_name(relativize=False)
inception = tok.get_uint32()
expiration = tok.get_uint32()
mode = tok.get_uint16()
error = tok.get_uint16()
key_b64 = tok.get_string().encode()
key = base64.b64decode(key_b64)
other_b64 = tok.concatenate_remaining_identifiers().encode()
other = base64.b64decode(other_b64)
return cls(rdclass, rdtype, algorithm, inception, expiration, mode,
error, key, other)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
self.algorithm.to_wire(file, compress, origin)
file.write(struct.pack("!IIHH", self.inception, self.expiration,
self.mode, self.error))
file.write(struct.pack("!H", len(self.key)))
file.write(self.key)
file.write(struct.pack("!H", len(self.other)))
if len(self.other) > 0:
file.write(self.other)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
algorithm = parser.get_name(origin)
inception, expiration, mode, error = parser.get_struct("!IIHH")
key = parser.get_counted_bytes(2)
other = parser.get_counted_bytes(2)
return cls(rdclass, rdtype, algorithm, inception, expiration, mode,
error, key, other)
# Constants for the mode field - from RFC 2930:
# 2.5 The Mode Field
#
# The mode field specifies the general scheme for key agreement or
# the purpose of the TKEY DNS message. Servers and resolvers
# supporting this specification MUST implement the Diffie-Hellman key
# agreement mode and the key deletion mode for queries. All other
# modes are OPTIONAL. A server supporting TKEY that receives a TKEY
# request with a mode it does not support returns the BADMODE error.
# The following values of the Mode octet are defined, available, or
# reserved:
#
# Value Description
# ----- -----------
# 0 - reserved, see section 7
# 1 server assignment
# 2 Diffie-Hellman exchange
# 3 GSS-API negotiation
# 4 resolver assignment
# 5 key deletion
# 6-65534 - available, see section 7
# 65535 - reserved, see section 7
SERVER_ASSIGNMENT = 1
DIFFIE_HELLMAN_EXCHANGE = 2
GSSAPI_NEGOTIATION = 3
RESOLVER_ASSIGNMENT = 4
KEY_DELETION = 5

View file

@ -1,67 +1,10 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2005-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
# provided that the above copyright notice and this permission notice
# appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import struct
import binascii
import dns.rdata
import dns.rdatatype
import dns.immutable
import dns.rdtypes.tlsabase
class TLSA(dns.rdata.Rdata):
@dns.immutable.immutable
class TLSA(dns.rdtypes.tlsabase.TLSABase):
"""TLSA record"""
# see: RFC 6698
__slots__ = ['usage', 'selector', 'mtype', 'cert']
def __init__(self, rdclass, rdtype, usage, selector,
mtype, cert):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'usage', usage)
object.__setattr__(self, 'selector', selector)
object.__setattr__(self, 'mtype', mtype)
object.__setattr__(self, 'cert', cert)
def to_text(self, origin=None, relativize=True, **kw):
return '%d %d %d %s' % (self.usage,
self.selector,
self.mtype,
dns.rdata._hexify(self.cert,
chunksize=128))
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
usage = tok.get_uint8()
selector = tok.get_uint8()
mtype = tok.get_uint8()
cert = tok.concatenate_remaining_identifiers().encode()
cert = binascii.unhexlify(cert)
return cls(rdclass, rdtype, usage, selector, mtype, cert)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
header = struct.pack("!BBB", self.usage, self.selector, self.mtype)
file.write(header)
file.write(self.cert)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
header = parser.get_struct("BBB")
cert = parser.get_remaining()
return cls(rdclass, rdtype, header[0], header[1], header[2], cert)

View file

@ -15,12 +15,16 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import base64
import struct
import dns.exception
import dns.immutable
import dns.rcode
import dns.rdata
@dns.immutable.immutable
class TSIG(dns.rdata.Rdata):
"""TSIG record"""
@ -52,20 +56,45 @@ class TSIG(dns.rdata.Rdata):
"""
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'algorithm', algorithm)
object.__setattr__(self, 'time_signed', time_signed)
object.__setattr__(self, 'fudge', fudge)
object.__setattr__(self, 'mac', dns.rdata._constify(mac))
object.__setattr__(self, 'original_id', original_id)
object.__setattr__(self, 'error', error)
object.__setattr__(self, 'other', dns.rdata._constify(other))
self.algorithm = self._as_name(algorithm)
self.time_signed = self._as_uint48(time_signed)
self.fudge = self._as_uint16(fudge)
self.mac = self._as_bytes(mac)
self.original_id = self._as_uint16(original_id)
self.error = dns.rcode.Rcode.make(error)
self.other = self._as_bytes(other)
def to_text(self, origin=None, relativize=True, **kw):
algorithm = self.algorithm.choose_relativity(origin, relativize)
return f"{algorithm} {self.fudge} {self.time_signed} " + \
error = dns.rcode.to_text(self.error, True)
text = f"{algorithm} {self.time_signed} {self.fudge} " + \
f"{len(self.mac)} {dns.rdata._base64ify(self.mac, 0)} " + \
f"{self.original_id} {self.error} " + \
f"{len(self.other)} {dns.rdata._base64ify(self.other, 0)}"
f"{self.original_id} {error} {len(self.other)}"
if self.other:
text += f" {dns.rdata._base64ify(self.other, 0)}"
return text
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
algorithm = tok.get_name(relativize=False)
time_signed = tok.get_uint48()
fudge = tok.get_uint16()
mac_len = tok.get_uint16()
mac = base64.b64decode(tok.get_string())
if len(mac) != mac_len:
raise SyntaxError('invalid MAC')
original_id = tok.get_uint16()
error = dns.rcode.from_text(tok.get_string())
other_len = tok.get_uint16()
if other_len > 0:
other = base64.b64decode(tok.get_string())
if len(other) != other_len:
raise SyntaxError('invalid other data')
else:
other = b''
return cls(rdclass, rdtype, algorithm, time_signed, fudge, mac,
original_id, error, other)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
self.algorithm.to_wire(file, None, origin, False)
@ -81,9 +110,9 @@ class TSIG(dns.rdata.Rdata):
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
algorithm = parser.get_name(origin)
(time_hi, time_lo, fudge) = parser.get_struct('!HIH')
time_signed = (time_hi << 32) + time_lo
algorithm = parser.get_name()
time_signed = parser.get_uint48()
fudge = parser.get_uint16()
mac = parser.get_counted_bytes(2)
(original_id, error) = parser.get_struct('!HH')
other = parser.get_counted_bytes(2)

View file

@ -16,8 +16,10 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.txtbase
import dns.immutable
@dns.immutable.immutable
class TXT(dns.rdtypes.txtbase.TXTBase):
"""TXT record"""

View file

@ -19,10 +19,13 @@
import struct
import dns.exception
import dns.immutable
import dns.rdata
import dns.rdtypes.util
import dns.name
@dns.immutable.immutable
class URI(dns.rdata.Rdata):
"""URI record"""
@ -33,14 +36,11 @@ class URI(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, priority, weight, target):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'priority', priority)
object.__setattr__(self, 'weight', weight)
if len(target) < 1:
self.priority = self._as_uint16(priority)
self.weight = self._as_uint16(weight)
self.target = self._as_bytes(target, True)
if len(self.target) == 0:
raise dns.exception.SyntaxError("URI target cannot be empty")
if isinstance(target, str):
object.__setattr__(self, 'target', target.encode())
else:
object.__setattr__(self, 'target', target)
def to_text(self, origin=None, relativize=True, **kw):
return '%d %d "%s"' % (self.priority, self.weight,
@ -54,7 +54,6 @@ class URI(dns.rdata.Rdata):
target = tok.get().unescape()
if not (target.is_quoted_string() or target.is_identifier()):
raise dns.exception.SyntaxError("URI target must be a string")
tok.get_eol()
return cls(rdclass, rdtype, priority, weight, target.value)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
@ -69,3 +68,13 @@ class URI(dns.rdata.Rdata):
if len(target) == 0:
raise dns.exception.FormError('URI target may not be empty')
return cls(rdclass, rdtype, priority, weight, target)
def _processing_priority(self):
return self.priority
def _processing_weight(self):
return self.weight
@classmethod
def _processing_order(cls, iterable):
return dns.rdtypes.util.weighted_processing_order(iterable)

View file

@ -18,10 +18,12 @@
import struct
import dns.exception
import dns.immutable
import dns.rdata
import dns.tokenizer
@dns.immutable.immutable
class X25(dns.rdata.Rdata):
"""X25 record"""
@ -32,10 +34,7 @@ class X25(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, address):
super().__init__(rdclass, rdtype)
if isinstance(address, str):
object.__setattr__(self, 'address', address.encode())
else:
object.__setattr__(self, 'address', address)
self.address = self._as_bytes(address, True, 255)
def to_text(self, origin=None, relativize=True, **kw):
return '"%s"' % dns.rdata._escapify(self.address)
@ -44,7 +43,6 @@ class X25(dns.rdata.Rdata):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
address = tok.get_string()
tok.get_eol()
return cls(rdclass, rdtype, address)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):

View file

@ -0,0 +1,65 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
import struct
import binascii
import dns.immutable
import dns.rdata
import dns.rdatatype
import dns.zone
@dns.immutable.immutable
class ZONEMD(dns.rdata.Rdata):
"""ZONEMD record"""
# See RFC 8976
__slots__ = ['serial', 'scheme', 'hash_algorithm', 'digest']
def __init__(self, rdclass, rdtype, serial, scheme, hash_algorithm, digest):
super().__init__(rdclass, rdtype)
self.serial = self._as_uint32(serial)
self.scheme = dns.zone.DigestScheme.make(scheme)
self.hash_algorithm = dns.zone.DigestHashAlgorithm.make(hash_algorithm)
self.digest = self._as_bytes(digest)
if self.scheme == 0: # reserved, RFC 8976 Sec. 5.2
raise ValueError('scheme 0 is reserved')
if self.hash_algorithm == 0: # reserved, RFC 8976 Sec. 5.3
raise ValueError('hash_algorithm 0 is reserved')
hasher = dns.zone._digest_hashers.get(self.hash_algorithm)
if hasher and hasher().digest_size != len(self.digest):
raise ValueError('digest length inconsistent with hash algorithm')
def to_text(self, origin=None, relativize=True, **kw):
kw = kw.copy()
chunksize = kw.pop('chunksize', 128)
return '%d %d %d %s' % (self.serial, self.scheme, self.hash_algorithm,
dns.rdata._hexify(self.digest,
chunksize=chunksize,
**kw))
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
serial = tok.get_uint32()
scheme = tok.get_uint8()
hash_algorithm = tok.get_uint8()
digest = tok.concatenate_remaining_identifiers().encode()
digest = binascii.unhexlify(digest)
return cls(rdclass, rdtype, serial, scheme, hash_algorithm, digest)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
header = struct.pack("!IBB", self.serial, self.scheme,
self.hash_algorithm)
file.write(header)
file.write(self.digest)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
header = parser.get_struct("!IBB")
digest = parser.get_remaining()
return cls(rdclass, rdtype, header[0], header[1], header[2], digest)

View file

@ -19,6 +19,7 @@
__all__ = [
'AFSDB',
'AMTRELAY',
'AVC',
'CAA',
'CDNSKEY',
@ -38,6 +39,7 @@ __all__ = [
'ISDN',
'LOC',
'MX',
'NINFO',
'NS',
'NSEC',
'NSEC3',
@ -48,12 +50,15 @@ __all__ = [
'RP',
'RRSIG',
'RT',
'SMIMEA',
'SOA',
'SPF',
'SSHFP',
'TKEY',
'TLSA',
'TSIG',
'TXT',
'URI',
'X25',
'ZONEMD',
]

View file

@ -15,9 +15,12 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.mxbase
import struct
import dns.rdtypes.mxbase
import dns.immutable
@dns.immutable.immutable
class A(dns.rdata.Rdata):
"""A record for Chaosnet"""
@ -29,8 +32,8 @@ class A(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, domain, address):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'domain', domain)
object.__setattr__(self, 'address', address)
self.domain = self._as_name(domain)
self.address = self._as_uint16(address)
def to_text(self, origin=None, relativize=True, **kw):
domain = self.domain.choose_relativity(origin, relativize)
@ -41,7 +44,6 @@ class A(dns.rdata.Rdata):
relativize_to=None):
domain = tok.get_name(origin, relativize, relativize_to)
address = tok.get_uint16(base=8)
tok.get_eol()
return cls(rdclass, rdtype, domain, address)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):

View file

@ -16,11 +16,13 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.exception
import dns.immutable
import dns.ipv4
import dns.rdata
import dns.tokenizer
@dns.immutable.immutable
class A(dns.rdata.Rdata):
"""A record."""
@ -29,9 +31,7 @@ class A(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, address):
super().__init__(rdclass, rdtype)
# check that it's OK
dns.ipv4.inet_aton(address)
object.__setattr__(self, 'address', address)
self.address = self._as_ipv4_address(address)
def to_text(self, origin=None, relativize=True, **kw):
return self.address
@ -40,7 +40,6 @@ class A(dns.rdata.Rdata):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
address = tok.get_identifier()
tok.get_eol()
return cls(rdclass, rdtype, address)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
@ -48,5 +47,5 @@ class A(dns.rdata.Rdata):
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
address = dns.ipv4.inet_ntoa(parser.get_remaining())
address = parser.get_remaining()
return cls(rdclass, rdtype, address)

View file

@ -16,11 +16,13 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.exception
import dns.immutable
import dns.ipv6
import dns.rdata
import dns.tokenizer
@dns.immutable.immutable
class AAAA(dns.rdata.Rdata):
"""AAAA record."""
@ -29,9 +31,7 @@ class AAAA(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, address):
super().__init__(rdclass, rdtype)
# check that it's OK
dns.ipv6.inet_aton(address)
object.__setattr__(self, 'address', address)
self.address = self._as_ipv6_address(address)
def to_text(self, origin=None, relativize=True, **kw):
return self.address
@ -40,7 +40,6 @@ class AAAA(dns.rdata.Rdata):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
address = tok.get_identifier()
tok.get_eol()
return cls(rdclass, rdtype, address)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
@ -48,5 +47,5 @@ class AAAA(dns.rdata.Rdata):
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
address = dns.ipv6.inet_ntoa(parser.get_remaining())
address = parser.get_remaining()
return cls(rdclass, rdtype, address)

View file

@ -20,11 +20,13 @@ import codecs
import struct
import dns.exception
import dns.immutable
import dns.ipv4
import dns.ipv6
import dns.rdata
import dns.tokenizer
@dns.immutable.immutable
class APLItem:
"""An APL list item."""
@ -32,10 +34,17 @@ class APLItem:
__slots__ = ['family', 'negation', 'address', 'prefix']
def __init__(self, family, negation, address, prefix):
self.family = family
self.negation = negation
self.address = address
self.prefix = prefix
self.family = dns.rdata.Rdata._as_uint16(family)
self.negation = dns.rdata.Rdata._as_bool(negation)
if self.family == 1:
self.address = dns.rdata.Rdata._as_ipv4_address(address)
self.prefix = dns.rdata.Rdata._as_int(prefix, 0, 32)
elif self.family == 2:
self.address = dns.rdata.Rdata._as_ipv6_address(address)
self.prefix = dns.rdata.Rdata._as_int(prefix, 0, 128)
else:
self.address = dns.rdata.Rdata._as_bytes(address, max_length=127)
self.prefix = dns.rdata.Rdata._as_uint8(prefix)
def __str__(self):
if self.negation:
@ -68,6 +77,7 @@ class APLItem:
file.write(address)
@dns.immutable.immutable
class APL(dns.rdata.Rdata):
"""APL record."""
@ -78,7 +88,10 @@ class APL(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, items):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'items', dns.rdata._constify(items))
for item in items:
if not isinstance(item, APLItem):
raise ValueError('item not an APLItem')
self.items = tuple(items)
def to_text(self, origin=None, relativize=True, **kw):
return ' '.join(map(str, self.items))
@ -87,11 +100,8 @@ class APL(dns.rdata.Rdata):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
items = []
while True:
token = tok.get().unescape()
if token.is_eol_or_eof():
break
item = token.value
for token in tok.get_remaining():
item = token.unescape().value
if item[0] == '!':
negation = True
item = item[1:]
@ -127,11 +137,9 @@ class APL(dns.rdata.Rdata):
if header[0] == 1:
if l < 4:
address += b'\x00' * (4 - l)
address = dns.ipv4.inet_ntoa(address)
elif header[0] == 2:
if l < 16:
address += b'\x00' * (16 - l)
address = dns.ipv6.inet_ntoa(address)
else:
#
# This isn't really right according to the RFC, but it

View file

@ -18,8 +18,10 @@
import base64
import dns.exception
import dns.immutable
@dns.immutable.immutable
class DHCID(dns.rdata.Rdata):
"""DHCID record"""
@ -30,10 +32,10 @@ class DHCID(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, data):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'data', data)
self.data = self._as_bytes(data)
def to_text(self, origin=None, relativize=True, **kw):
return dns.rdata._base64ify(self.data)
return dns.rdata._base64ify(self.data, **kw)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,

View file

@ -0,0 +1,8 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
import dns.rdtypes.svcbbase
import dns.immutable
@dns.immutable.immutable
class HTTPS(dns.rdtypes.svcbbase.SVCBBase):
"""HTTPS record"""

View file

@ -19,12 +19,14 @@ import struct
import base64
import dns.exception
import dns.immutable
import dns.rdtypes.util
class Gateway(dns.rdtypes.util.Gateway):
name = 'IPSECKEY gateway'
@dns.immutable.immutable
class IPSECKEY(dns.rdata.Rdata):
"""IPSECKEY record"""
@ -36,19 +38,19 @@ class IPSECKEY(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, precedence, gateway_type, algorithm,
gateway, key):
super().__init__(rdclass, rdtype)
Gateway(gateway_type, gateway).check()
object.__setattr__(self, 'precedence', precedence)
object.__setattr__(self, 'gateway_type', gateway_type)
object.__setattr__(self, 'algorithm', algorithm)
object.__setattr__(self, 'gateway', gateway)
object.__setattr__(self, 'key', key)
gateway = Gateway(gateway_type, gateway)
self.precedence = self._as_uint8(precedence)
self.gateway_type = gateway.type
self.algorithm = self._as_uint8(algorithm)
self.gateway = gateway.gateway
self.key = self._as_bytes(key)
def to_text(self, origin=None, relativize=True, **kw):
gateway = Gateway(self.gateway_type, self.gateway).to_text(origin,
relativize)
return '%d %d %d %s %s' % (self.precedence, self.gateway_type,
self.algorithm, gateway,
dns.rdata._base64ify(self.key))
dns.rdata._base64ify(self.key, **kw))
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
@ -56,12 +58,12 @@ class IPSECKEY(dns.rdata.Rdata):
precedence = tok.get_uint8()
gateway_type = tok.get_uint8()
algorithm = tok.get_uint8()
gateway = Gateway(gateway_type).from_text(tok, origin, relativize,
relativize_to)
gateway = Gateway.from_text(gateway_type, tok, origin, relativize,
relativize_to)
b64 = tok.concatenate_remaining_identifiers().encode()
key = base64.b64decode(b64)
return cls(rdclass, rdtype, precedence, gateway_type, algorithm,
gateway, key)
gateway.gateway, key)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
header = struct.pack("!BBB", self.precedence, self.gateway_type,
@ -75,7 +77,7 @@ class IPSECKEY(dns.rdata.Rdata):
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
header = parser.get_struct('!BBB')
gateway_type = header[1]
gateway = Gateway(gateway_type).from_wire_parser(parser, origin)
gateway = Gateway.from_wire_parser(gateway_type, parser, origin)
key = parser.get_remaining()
return cls(rdclass, rdtype, header[0], gateway_type, header[2],
gateway, key)
gateway.gateway, key)

View file

@ -16,8 +16,10 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.mxbase
import dns.immutable
@dns.immutable.immutable
class KX(dns.rdtypes.mxbase.UncompressedDowncasingMX):
"""KX record"""

View file

@ -18,8 +18,10 @@
import struct
import dns.exception
import dns.immutable
import dns.name
import dns.rdata
import dns.rdtypes.util
def _write_string(file, s):
@ -29,12 +31,7 @@ def _write_string(file, s):
file.write(s)
def _sanitize(value):
if isinstance(value, str):
return value.encode()
return value
@dns.immutable.immutable
class NAPTR(dns.rdata.Rdata):
"""NAPTR record"""
@ -47,12 +44,12 @@ class NAPTR(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, order, preference, flags, service,
regexp, replacement):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'flags', _sanitize(flags))
object.__setattr__(self, 'service', _sanitize(service))
object.__setattr__(self, 'regexp', _sanitize(regexp))
object.__setattr__(self, 'order', order)
object.__setattr__(self, 'preference', preference)
object.__setattr__(self, 'replacement', replacement)
self.flags = self._as_bytes(flags, True, 255)
self.service = self._as_bytes(service, True, 255)
self.regexp = self._as_bytes(regexp, True, 255)
self.order = self._as_uint16(order)
self.preference = self._as_uint16(preference)
self.replacement = self._as_name(replacement)
def to_text(self, origin=None, relativize=True, **kw):
replacement = self.replacement.choose_relativity(origin, relativize)
@ -72,7 +69,6 @@ class NAPTR(dns.rdata.Rdata):
service = tok.get_string()
regexp = tok.get_string()
replacement = tok.get_name(origin, relativize, relativize_to)
tok.get_eol()
return cls(rdclass, rdtype, order, preference, flags, service,
regexp, replacement)
@ -88,9 +84,16 @@ class NAPTR(dns.rdata.Rdata):
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
(order, preference) = parser.get_struct('!HH')
strings = []
for i in range(3):
for _ in range(3):
s = parser.get_counted_bytes()
strings.append(s)
replacement = parser.get_name(origin)
return cls(rdclass, rdtype, order, preference, strings[0], strings[1],
strings[2], replacement)
def _processing_priority(self):
return (self.order, self.preference)
@classmethod
def _processing_order(cls, iterable):
return dns.rdtypes.util.priority_processing_order(iterable)

View file

@ -18,10 +18,12 @@
import binascii
import dns.exception
import dns.immutable
import dns.rdata
import dns.tokenizer
@dns.immutable.immutable
class NSAP(dns.rdata.Rdata):
"""NSAP record."""
@ -32,7 +34,7 @@ class NSAP(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, address):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'address', address)
self.address = self._as_bytes(address)
def to_text(self, origin=None, relativize=True, **kw):
return "0x%s" % binascii.hexlify(self.address).decode()
@ -41,7 +43,6 @@ class NSAP(dns.rdata.Rdata):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
address = tok.get_string()
tok.get_eol()
if address[0:2] != '0x':
raise dns.exception.SyntaxError('string does not start with 0x')
address = address[2:].replace('.', '')

View file

@ -16,8 +16,10 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.nsbase
import dns.immutable
@dns.immutable.immutable
class NSAP_PTR(dns.rdtypes.nsbase.UncompressedNS):
"""NSAP-PTR record"""

View file

@ -18,10 +18,13 @@
import struct
import dns.exception
import dns.immutable
import dns.rdata
import dns.rdtypes.util
import dns.name
@dns.immutable.immutable
class PX(dns.rdata.Rdata):
"""PX record."""
@ -32,9 +35,9 @@ class PX(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, preference, map822, mapx400):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'preference', preference)
object.__setattr__(self, 'map822', map822)
object.__setattr__(self, 'mapx400', mapx400)
self.preference = self._as_uint16(preference)
self.map822 = self._as_name(map822)
self.mapx400 = self._as_name(mapx400)
def to_text(self, origin=None, relativize=True, **kw):
map822 = self.map822.choose_relativity(origin, relativize)
@ -47,7 +50,6 @@ class PX(dns.rdata.Rdata):
preference = tok.get_uint16()
map822 = tok.get_name(origin, relativize, relativize_to)
mapx400 = tok.get_name(origin, relativize, relativize_to)
tok.get_eol()
return cls(rdclass, rdtype, preference, map822, mapx400)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
@ -62,3 +64,10 @@ class PX(dns.rdata.Rdata):
map822 = parser.get_name(origin)
mapx400 = parser.get_name(origin)
return cls(rdclass, rdtype, preference, map822, mapx400)
def _processing_priority(self):
return self.preference
@classmethod
def _processing_order(cls, iterable):
return dns.rdtypes.util.priority_processing_order(iterable)

View file

@ -18,10 +18,13 @@
import struct
import dns.exception
import dns.immutable
import dns.rdata
import dns.rdtypes.util
import dns.name
@dns.immutable.immutable
class SRV(dns.rdata.Rdata):
"""SRV record"""
@ -32,10 +35,10 @@ class SRV(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, priority, weight, port, target):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'priority', priority)
object.__setattr__(self, 'weight', weight)
object.__setattr__(self, 'port', port)
object.__setattr__(self, 'target', target)
self.priority = self._as_uint16(priority)
self.weight = self._as_uint16(weight)
self.port = self._as_uint16(port)
self.target = self._as_name(target)
def to_text(self, origin=None, relativize=True, **kw):
target = self.target.choose_relativity(origin, relativize)
@ -49,7 +52,6 @@ class SRV(dns.rdata.Rdata):
weight = tok.get_uint16()
port = tok.get_uint16()
target = tok.get_name(origin, relativize, relativize_to)
tok.get_eol()
return cls(rdclass, rdtype, priority, weight, port, target)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
@ -62,3 +64,13 @@ class SRV(dns.rdata.Rdata):
(priority, weight, port) = parser.get_struct('!HHH')
target = parser.get_name(origin)
return cls(rdclass, rdtype, priority, weight, port, target)
def _processing_priority(self):
return self.priority
def _processing_weight(self):
return self.weight
@classmethod
def _processing_order(cls, iterable):
return dns.rdtypes.util.weighted_processing_order(iterable)

View file

@ -0,0 +1,8 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
import dns.rdtypes.svcbbase
import dns.immutable
@dns.immutable.immutable
class SVCB(dns.rdtypes.svcbbase.SVCBBase):
"""SVCB record"""

View file

@ -19,12 +19,18 @@ import socket
import struct
import dns.ipv4
import dns.immutable
import dns.rdata
_proto_tcp = socket.getprotobyname('tcp')
_proto_udp = socket.getprotobyname('udp')
try:
_proto_tcp = socket.getprotobyname('tcp')
_proto_udp = socket.getprotobyname('udp')
except OSError:
# Fall back to defaults in case /etc/protocols is unavailable.
_proto_tcp = 6
_proto_udp = 17
@dns.immutable.immutable
class WKS(dns.rdata.Rdata):
"""WKS record"""
@ -35,14 +41,13 @@ class WKS(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, address, protocol, bitmap):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'address', address)
object.__setattr__(self, 'protocol', protocol)
object.__setattr__(self, 'bitmap', dns.rdata._constify(bitmap))
self.address = self._as_ipv4_address(address)
self.protocol = self._as_uint8(protocol)
self.bitmap = self._as_bytes(bitmap)
def to_text(self, origin=None, relativize=True, **kw):
bits = []
for i in range(0, len(self.bitmap)):
byte = self.bitmap[i]
for i, byte in enumerate(self.bitmap):
for j in range(0, 8):
if byte & (0x80 >> j):
bits.append(str(i * 8 + j))
@ -59,12 +64,10 @@ class WKS(dns.rdata.Rdata):
else:
protocol = socket.getprotobyname(protocol)
bitmap = bytearray()
while 1:
token = tok.get().unescape()
if token.is_eol_or_eof():
break
if token.value.isdigit():
serv = int(token.value)
for token in tok.get_remaining():
value = token.unescape().value
if value.isdigit():
serv = int(value)
else:
if protocol != _proto_udp and protocol != _proto_tcp:
raise NotImplementedError("protocol must be TCP or UDP")
@ -72,11 +75,11 @@ class WKS(dns.rdata.Rdata):
protocol_text = "udp"
else:
protocol_text = "tcp"
serv = socket.getservbyname(token.value, protocol_text)
serv = socket.getservbyname(value, protocol_text)
i = serv // 8
l = len(bitmap)
if l < i + 1:
for j in range(l, i + 1):
for _ in range(l, i + 1):
bitmap.append(0)
bitmap[i] = bitmap[i] | (0x80 >> (serv % 8))
bitmap = dns.rdata._truncate_bitmap(bitmap)
@ -90,7 +93,7 @@ class WKS(dns.rdata.Rdata):
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
address = dns.ipv4.inet_ntoa(parser.get_bytes(4))
address = parser.get_bytes(4)
protocol = parser.get_uint8()
bitmap = parser.get_remaining()
return cls(rdclass, rdtype, address, protocol, bitmap)

View file

@ -22,6 +22,7 @@ __all__ = [
'AAAA',
'APL',
'DHCID',
'HTTPS',
'IPSECKEY',
'KX',
'NAPTR',
@ -29,5 +30,6 @@ __all__ = [
'NSAP_PTR',
'PX',
'SRV',
'SVCB',
'WKS',
]

View file

@ -21,8 +21,13 @@ __all__ = [
'ANY',
'IN',
'CH',
'dnskeybase',
'dsbase',
'euibase',
'mxbase',
'nsbase',
'svcbbase',
'tlsabase',
'txtbase',
'util'
]

View file

@ -20,6 +20,7 @@ import enum
import struct
import dns.exception
import dns.immutable
import dns.dnssec
import dns.rdata
@ -31,9 +32,8 @@ class Flag(enum.IntFlag):
REVOKE = 0x0080
ZONE = 0x0100
globals().update(Flag.__members__)
@dns.immutable.immutable
class DNSKEYBase(dns.rdata.Rdata):
"""Base class for rdata that is like a DNSKEY record"""
@ -42,21 +42,21 @@ class DNSKEYBase(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, flags, protocol, algorithm, key):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'flags', flags)
object.__setattr__(self, 'protocol', protocol)
object.__setattr__(self, 'algorithm', algorithm)
object.__setattr__(self, 'key', key)
self.flags = self._as_uint16(flags)
self.protocol = self._as_uint8(protocol)
self.algorithm = dns.dnssec.Algorithm.make(algorithm)
self.key = self._as_bytes(key)
def to_text(self, origin=None, relativize=True, **kw):
return '%d %d %d %s' % (self.flags, self.protocol, self.algorithm,
dns.rdata._base64ify(self.key))
dns.rdata._base64ify(self.key, **kw))
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
flags = tok.get_uint16()
protocol = tok.get_uint8()
algorithm = dns.dnssec.algorithm_from_text(tok.get_string())
algorithm = tok.get_string()
b64 = tok.concatenate_remaining_identifiers().encode()
key = base64.b64decode(b64)
return cls(rdclass, rdtype, flags, protocol, algorithm, key)
@ -72,3 +72,11 @@ class DNSKEYBase(dns.rdata.Rdata):
key = parser.get_remaining()
return cls(rdclass, rdtype, header[0], header[1], header[2],
key)
### BEGIN generated Flag constants
SEP = Flag.SEP
REVOKE = Flag.REVOKE
ZONE = Flag.ZONE
### END generated Flag constants

View file

@ -0,0 +1,38 @@
from typing import Set, Any
SEP : int
REVOKE : int
ZONE : int
def flags_to_text_set(flags : int) -> Set[str]:
...
def flags_from_text_set(texts_set) -> int:
...
from .. import rdata
class DNSKEYBase(rdata.Rdata):
def __init__(self, rdclass, rdtype, flags, protocol, algorithm, key):
self.flags : int
self.protocol : int
self.key : str
self.algorithm : int
def to_text(self, origin : Any = None, relativize=True, **kw : Any):
...
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
...
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
...
@classmethod
def from_parser(cls, rdclass, rdtype, parser, origin=None):
...
def flags_to_text_set(self) -> Set[str]:
...

View file

@ -19,35 +19,54 @@ import struct
import binascii
import dns.dnssec
import dns.immutable
import dns.rdata
import dns.rdatatype
@dns.immutable.immutable
class DSBase(dns.rdata.Rdata):
"""Base class for rdata that is like a DS record"""
__slots__ = ['key_tag', 'algorithm', 'digest_type', 'digest']
# Digest types registry: https://www.iana.org/assignments/ds-rr-types/ds-rr-types.xhtml
_digest_length_by_type = {
1: 20, # SHA-1, RFC 3658 Sec. 2.4
2: 32, # SHA-256, RFC 4509 Sec. 2.2
3: 32, # GOST R 34.11-94, RFC 5933 Sec. 4 in conjunction with RFC 4490 Sec. 2.1
4: 48, # SHA-384, RFC 6605 Sec. 2
}
def __init__(self, rdclass, rdtype, key_tag, algorithm, digest_type,
digest):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'key_tag', key_tag)
object.__setattr__(self, 'algorithm', algorithm)
object.__setattr__(self, 'digest_type', digest_type)
object.__setattr__(self, 'digest', digest)
self.key_tag = self._as_uint16(key_tag)
self.algorithm = dns.dnssec.Algorithm.make(algorithm)
self.digest_type = self._as_uint8(digest_type)
self.digest = self._as_bytes(digest)
try:
if len(self.digest) != self._digest_length_by_type[self.digest_type]:
raise ValueError('digest length inconsistent with digest type')
except KeyError:
if self.digest_type == 0: # reserved, RFC 3658 Sec. 2.4
raise ValueError('digest type 0 is reserved')
def to_text(self, origin=None, relativize=True, **kw):
kw = kw.copy()
chunksize = kw.pop('chunksize', 128)
return '%d %d %d %s' % (self.key_tag, self.algorithm,
self.digest_type,
dns.rdata._hexify(self.digest,
chunksize=128))
chunksize=chunksize,
**kw))
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
key_tag = tok.get_uint16()
algorithm = dns.dnssec.algorithm_from_text(tok.get_string())
algorithm = tok.get_string()
digest_type = tok.get_uint8()
digest = tok.concatenate_remaining_identifiers().encode()
digest = binascii.unhexlify(digest)

View file

@ -17,8 +17,10 @@
import binascii
import dns.rdata
import dns.immutable
@dns.immutable.immutable
class EUIBase(dns.rdata.Rdata):
"""EUIxx record"""
@ -32,19 +34,18 @@ class EUIBase(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, eui):
super().__init__(rdclass, rdtype)
if len(eui) != self.byte_len:
self.eui = self._as_bytes(eui)
if len(self.eui) != self.byte_len:
raise dns.exception.FormError('EUI%s rdata has to have %s bytes'
% (self.byte_len * 8, self.byte_len))
object.__setattr__(self, 'eui', eui)
def to_text(self, origin=None, relativize=True, **kw):
return dns.rdata._hexify(self.eui, chunksize=2).replace(' ', '-')
return dns.rdata._hexify(self.eui, chunksize=2, separator=b'-', **kw)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
text = tok.get_string()
tok.get_eol()
if len(text) != cls.text_len:
raise dns.exception.SyntaxError(
'Input text must have %s characters' % cls.text_len)

View file

@ -20,10 +20,13 @@
import struct
import dns.exception
import dns.immutable
import dns.rdata
import dns.name
import dns.rdtypes.util
@dns.immutable.immutable
class MXBase(dns.rdata.Rdata):
"""Base class for rdata that is like an MX record."""
@ -32,8 +35,8 @@ class MXBase(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, preference, exchange):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'preference', preference)
object.__setattr__(self, 'exchange', exchange)
self.preference = self._as_uint16(preference)
self.exchange = self._as_name(exchange)
def to_text(self, origin=None, relativize=True, **kw):
exchange = self.exchange.choose_relativity(origin, relativize)
@ -44,7 +47,6 @@ class MXBase(dns.rdata.Rdata):
relativize_to=None):
preference = tok.get_uint16()
exchange = tok.get_name(origin, relativize, relativize_to)
tok.get_eol()
return cls(rdclass, rdtype, preference, exchange)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
@ -58,7 +60,15 @@ class MXBase(dns.rdata.Rdata):
exchange = parser.get_name(origin)
return cls(rdclass, rdtype, preference, exchange)
def _processing_priority(self):
return self.preference
@classmethod
def _processing_order(cls, iterable):
return dns.rdtypes.util.priority_processing_order(iterable)
@dns.immutable.immutable
class UncompressedMX(MXBase):
"""Base class for rdata that is like an MX record, but whose name
@ -69,6 +79,7 @@ class UncompressedMX(MXBase):
super()._to_wire(file, None, origin, False)
@dns.immutable.immutable
class UncompressedDowncasingMX(MXBase):
"""Base class for rdata that is like an MX record, but whose name

View file

@ -18,10 +18,12 @@
"""NS-like base classes."""
import dns.exception
import dns.immutable
import dns.rdata
import dns.name
@dns.immutable.immutable
class NSBase(dns.rdata.Rdata):
"""Base class for rdata that is like an NS record."""
@ -30,7 +32,7 @@ class NSBase(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, target):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'target', target)
self.target = self._as_name(target)
def to_text(self, origin=None, relativize=True, **kw):
target = self.target.choose_relativity(origin, relativize)
@ -40,7 +42,6 @@ class NSBase(dns.rdata.Rdata):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
target = tok.get_name(origin, relativize, relativize_to)
tok.get_eol()
return cls(rdclass, rdtype, target)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
@ -52,6 +53,7 @@ class NSBase(dns.rdata.Rdata):
return cls(rdclass, rdtype, target)
@dns.immutable.immutable
class UncompressedNS(NSBase):
"""Base class for rdata that is like an NS record, but whose name

555
lib/dns/rdtypes/svcbbase.py Normal file
View file

@ -0,0 +1,555 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
import base64
import enum
import io
import struct
import dns.enum
import dns.exception
import dns.immutable
import dns.ipv4
import dns.ipv6
import dns.name
import dns.rdata
import dns.rdtypes.util
import dns.tokenizer
import dns.wire
# Until there is an RFC, this module is experimental and may be changed in
# incompatible ways.
class UnknownParamKey(dns.exception.DNSException):
"""Unknown SVCB ParamKey"""
class ParamKey(dns.enum.IntEnum):
"""SVCB ParamKey"""
MANDATORY = 0
ALPN = 1
NO_DEFAULT_ALPN = 2
PORT = 3
IPV4HINT = 4
ECH = 5
IPV6HINT = 6
@classmethod
def _maximum(cls):
return 65535
@classmethod
def _short_name(cls):
return "SVCBParamKey"
@classmethod
def _prefix(cls):
return "KEY"
@classmethod
def _unknown_exception_class(cls):
return UnknownParamKey
class Emptiness(enum.IntEnum):
NEVER = 0
ALWAYS = 1
ALLOWED = 2
def _validate_key(key):
force_generic = False
if isinstance(key, bytes):
# We decode to latin-1 so we get 0-255 as valid and do NOT interpret
# UTF-8 sequences
key = key.decode('latin-1')
if isinstance(key, str):
if key.lower().startswith('key'):
force_generic = True
if key[3:].startswith('0') and len(key) != 4:
# key has leading zeros
raise ValueError('leading zeros in key')
key = key.replace('-', '_')
return (ParamKey.make(key), force_generic)
def key_to_text(key):
return ParamKey.to_text(key).replace('_', '-').lower()
# Like rdata escapify, but escapes ',' too.
_escaped = b'",\\'
def _escapify(qstring):
text = ''
for c in qstring:
if c in _escaped:
text += '\\' + chr(c)
elif c >= 0x20 and c < 0x7F:
text += chr(c)
else:
text += '\\%03d' % c
return text
def _unescape(value):
if value == '':
return value
unescaped = b''
l = len(value)
i = 0
while i < l:
c = value[i]
i += 1
if c == '\\':
if i >= l: # pragma: no cover (can't happen via tokenizer get())
raise dns.exception.UnexpectedEnd
c = value[i]
i += 1
if c.isdigit():
if i >= l:
raise dns.exception.UnexpectedEnd
c2 = value[i]
i += 1
if i >= l:
raise dns.exception.UnexpectedEnd
c3 = value[i]
i += 1
if not (c2.isdigit() and c3.isdigit()):
raise dns.exception.SyntaxError
codepoint = int(c) * 100 + int(c2) * 10 + int(c3)
if codepoint > 255:
raise dns.exception.SyntaxError
unescaped += b'%c' % (codepoint)
continue
unescaped += c.encode()
return unescaped
def _split(value):
l = len(value)
i = 0
items = []
unescaped = b''
while i < l:
c = value[i]
i += 1
if c == ord('\\'):
if i >= l: # pragma: no cover (can't happen via tokenizer get())
raise dns.exception.UnexpectedEnd
c = value[i]
i += 1
unescaped += b'%c' % (c)
elif c == ord(','):
items.append(unescaped)
unescaped = b''
else:
unescaped += b'%c' % (c)
items.append(unescaped)
return items
@dns.immutable.immutable
class Param:
"""Abstract base class for SVCB parameters"""
@classmethod
def emptiness(cls):
return Emptiness.NEVER
@dns.immutable.immutable
class GenericParam(Param):
"""Generic SVCB parameter
"""
def __init__(self, value):
self.value = dns.rdata.Rdata._as_bytes(value, True)
@classmethod
def emptiness(cls):
return Emptiness.ALLOWED
@classmethod
def from_value(cls, value):
if value is None or len(value) == 0:
return None
else:
return cls(_unescape(value))
def to_text(self):
return '"' + dns.rdata._escapify(self.value) + '"'
@classmethod
def from_wire_parser(cls, parser, origin=None): # pylint: disable=W0613
value = parser.get_bytes(parser.remaining())
if len(value) == 0:
return None
else:
return cls(value)
def to_wire(self, file, origin=None): # pylint: disable=W0613
file.write(self.value)
@dns.immutable.immutable
class MandatoryParam(Param):
def __init__(self, keys):
# check for duplicates
keys = sorted([_validate_key(key)[0] for key in keys])
prior_k = None
for k in keys:
if k == prior_k:
raise ValueError(f'duplicate key {k:d}')
prior_k = k
if k == ParamKey.MANDATORY:
raise ValueError('listed the mandatory key as mandatory')
self.keys = tuple(keys)
@classmethod
def from_value(cls, value):
keys = [k.encode() for k in value.split(',')]
return cls(keys)
def to_text(self):
return '"' + ','.join([key_to_text(key) for key in self.keys]) + '"'
@classmethod
def from_wire_parser(cls, parser, origin=None): # pylint: disable=W0613
keys = []
last_key = -1
while parser.remaining() > 0:
key = parser.get_uint16()
if key < last_key:
raise dns.exception.FormError('manadatory keys not ascending')
last_key = key
keys.append(key)
return cls(keys)
def to_wire(self, file, origin=None): # pylint: disable=W0613
for key in self.keys:
file.write(struct.pack('!H', key))
@dns.immutable.immutable
class ALPNParam(Param):
def __init__(self, ids):
self.ids = dns.rdata.Rdata._as_tuple(
ids, lambda x: dns.rdata.Rdata._as_bytes(x, True, 255, False))
@classmethod
def from_value(cls, value):
return cls(_split(_unescape(value)))
def to_text(self):
value = ','.join([_escapify(id) for id in self.ids])
return '"' + dns.rdata._escapify(value.encode()) + '"'
@classmethod
def from_wire_parser(cls, parser, origin=None): # pylint: disable=W0613
ids = []
while parser.remaining() > 0:
id = parser.get_counted_bytes()
ids.append(id)
return cls(ids)
def to_wire(self, file, origin=None): # pylint: disable=W0613
for id in self.ids:
file.write(struct.pack('!B', len(id)))
file.write(id)
@dns.immutable.immutable
class NoDefaultALPNParam(Param):
# We don't ever expect to instantiate this class, but we need
# a from_value() and a from_wire_parser(), so we just return None
# from the class methods when things are OK.
@classmethod
def emptiness(cls):
return Emptiness.ALWAYS
@classmethod
def from_value(cls, value):
if value is None or value == '':
return None
else:
raise ValueError('no-default-alpn with non-empty value')
def to_text(self):
raise NotImplementedError # pragma: no cover
@classmethod
def from_wire_parser(cls, parser, origin=None): # pylint: disable=W0613
if parser.remaining() != 0:
raise dns.exception.FormError
return None
def to_wire(self, file, origin=None): # pylint: disable=W0613
raise NotImplementedError # pragma: no cover
@dns.immutable.immutable
class PortParam(Param):
def __init__(self, port):
self.port = dns.rdata.Rdata._as_uint16(port)
@classmethod
def from_value(cls, value):
value = int(value)
return cls(value)
def to_text(self):
return f'"{self.port}"'
@classmethod
def from_wire_parser(cls, parser, origin=None): # pylint: disable=W0613
port = parser.get_uint16()
return cls(port)
def to_wire(self, file, origin=None): # pylint: disable=W0613
file.write(struct.pack('!H', self.port))
@dns.immutable.immutable
class IPv4HintParam(Param):
def __init__(self, addresses):
self.addresses = dns.rdata.Rdata._as_tuple(
addresses, dns.rdata.Rdata._as_ipv4_address)
@classmethod
def from_value(cls, value):
addresses = value.split(',')
return cls(addresses)
def to_text(self):
return '"' + ','.join(self.addresses) + '"'
@classmethod
def from_wire_parser(cls, parser, origin=None): # pylint: disable=W0613
addresses = []
while parser.remaining() > 0:
ip = parser.get_bytes(4)
addresses.append(dns.ipv4.inet_ntoa(ip))
return cls(addresses)
def to_wire(self, file, origin=None): # pylint: disable=W0613
for address in self.addresses:
file.write(dns.ipv4.inet_aton(address))
@dns.immutable.immutable
class IPv6HintParam(Param):
def __init__(self, addresses):
self.addresses = dns.rdata.Rdata._as_tuple(
addresses, dns.rdata.Rdata._as_ipv6_address)
@classmethod
def from_value(cls, value):
addresses = value.split(',')
return cls(addresses)
def to_text(self):
return '"' + ','.join(self.addresses) + '"'
@classmethod
def from_wire_parser(cls, parser, origin=None): # pylint: disable=W0613
addresses = []
while parser.remaining() > 0:
ip = parser.get_bytes(16)
addresses.append(dns.ipv6.inet_ntoa(ip))
return cls(addresses)
def to_wire(self, file, origin=None): # pylint: disable=W0613
for address in self.addresses:
file.write(dns.ipv6.inet_aton(address))
@dns.immutable.immutable
class ECHParam(Param):
def __init__(self, ech):
self.ech = dns.rdata.Rdata._as_bytes(ech, True)
@classmethod
def from_value(cls, value):
if '\\' in value:
raise ValueError('escape in ECH value')
value = base64.b64decode(value.encode())
return cls(value)
def to_text(self):
b64 = base64.b64encode(self.ech).decode('ascii')
return f'"{b64}"'
@classmethod
def from_wire_parser(cls, parser, origin=None): # pylint: disable=W0613
value = parser.get_bytes(parser.remaining())
return cls(value)
def to_wire(self, file, origin=None): # pylint: disable=W0613
file.write(self.ech)
_class_for_key = {
ParamKey.MANDATORY: MandatoryParam,
ParamKey.ALPN: ALPNParam,
ParamKey.NO_DEFAULT_ALPN: NoDefaultALPNParam,
ParamKey.PORT: PortParam,
ParamKey.IPV4HINT: IPv4HintParam,
ParamKey.ECH: ECHParam,
ParamKey.IPV6HINT: IPv6HintParam,
}
def _validate_and_define(params, key, value):
(key, force_generic) = _validate_key(_unescape(key))
if key in params:
raise SyntaxError(f'duplicate key "{key:d}"')
cls = _class_for_key.get(key, GenericParam)
emptiness = cls.emptiness()
if value is None:
if emptiness == Emptiness.NEVER:
raise SyntaxError('value cannot be empty')
value = cls.from_value(value)
else:
if force_generic:
value = cls.from_wire_parser(dns.wire.Parser(_unescape(value)))
else:
value = cls.from_value(value)
params[key] = value
@dns.immutable.immutable
class SVCBBase(dns.rdata.Rdata):
"""Base class for SVCB-like records"""
# see: draft-ietf-dnsop-svcb-https-01
__slots__ = ['priority', 'target', 'params']
def __init__(self, rdclass, rdtype, priority, target, params):
super().__init__(rdclass, rdtype)
self.priority = self._as_uint16(priority)
self.target = self._as_name(target)
for k, v in params.items():
k = ParamKey.make(k)
if not isinstance(v, Param) and v is not None:
raise ValueError("not a Param")
self.params = dns.immutable.Dict(params)
# Make sure any paramater listed as mandatory is present in the
# record.
mandatory = params.get(ParamKey.MANDATORY)
if mandatory:
for key in mandatory.keys:
# Note we have to say "not in" as we have None as a value
# so a get() and a not None test would be wrong.
if key not in params:
raise ValueError(f'key {key:d} declared mandatory but not '
'present')
# The no-default-alpn parameter requires the alpn parameter.
if ParamKey.NO_DEFAULT_ALPN in params:
if ParamKey.ALPN not in params:
raise ValueError('no-default-alpn present, but alpn missing')
def to_text(self, origin=None, relativize=True, **kw):
target = self.target.choose_relativity(origin, relativize)
params = []
for key in sorted(self.params.keys()):
value = self.params[key]
if value is None:
params.append(key_to_text(key))
else:
kv = key_to_text(key) + '=' + value.to_text()
params.append(kv)
if len(params) > 0:
space = ' '
else:
space = ''
return '%d %s%s%s' % (self.priority, target, space, ' '.join(params))
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
priority = tok.get_uint16()
target = tok.get_name(origin, relativize, relativize_to)
if priority == 0:
token = tok.get()
if not token.is_eol_or_eof():
raise SyntaxError('parameters in AliasMode')
tok.unget(token)
params = {}
while True:
token = tok.get()
if token.is_eol_or_eof():
tok.unget(token)
break
if token.ttype != dns.tokenizer.IDENTIFIER:
raise SyntaxError('parameter is not an identifier')
equals = token.value.find('=')
if equals == len(token.value) - 1:
# 'key=', so next token should be a quoted string without
# any intervening whitespace.
key = token.value[:-1]
token = tok.get(want_leading=True)
if token.ttype != dns.tokenizer.QUOTED_STRING:
raise SyntaxError('whitespace after =')
value = token.value
elif equals > 0:
# key=value
key = token.value[:equals]
value = token.value[equals + 1:]
elif equals == 0:
# =key
raise SyntaxError('parameter cannot start with "="')
else:
# key
key = token.value
value = None
_validate_and_define(params, key, value)
return cls(rdclass, rdtype, priority, target, params)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
file.write(struct.pack("!H", self.priority))
self.target.to_wire(file, None, origin, False)
for key in sorted(self.params):
file.write(struct.pack("!H", key))
value = self.params[key]
# placeholder for length (or actual length of empty values)
file.write(struct.pack("!H", 0))
if value is None:
continue
else:
start = file.tell()
value.to_wire(file, origin)
end = file.tell()
assert end - start < 65536
file.seek(start - 2)
stuff = struct.pack("!H", end - start)
file.write(stuff)
file.seek(0, io.SEEK_END)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
priority = parser.get_uint16()
target = parser.get_name(origin)
if priority == 0 and parser.remaining() != 0:
raise dns.exception.FormError('parameters in AliasMode')
params = {}
prior_key = -1
while parser.remaining() > 0:
key = parser.get_uint16()
if key < prior_key:
raise dns.exception.FormError('keys not in order')
prior_key = key
vlen = parser.get_uint16()
pcls = _class_for_key.get(key, GenericParam)
with parser.restrict_to(vlen):
value = pcls.from_wire_parser(parser, origin)
params[key] = value
return cls(rdclass, rdtype, priority, target, params)
def _processing_priority(self):
return self.priority
@classmethod
def _processing_order(cls, iterable):
return dns.rdtypes.util.priority_processing_order(iterable)

View file

@ -0,0 +1,72 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2005-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
# provided that the above copyright notice and this permission notice
# appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import struct
import binascii
import dns.rdata
import dns.immutable
import dns.rdatatype
@dns.immutable.immutable
class TLSABase(dns.rdata.Rdata):
"""Base class for TLSA and SMIMEA records"""
# see: RFC 6698
__slots__ = ['usage', 'selector', 'mtype', 'cert']
def __init__(self, rdclass, rdtype, usage, selector,
mtype, cert):
super().__init__(rdclass, rdtype)
self.usage = self._as_uint8(usage)
self.selector = self._as_uint8(selector)
self.mtype = self._as_uint8(mtype)
self.cert = self._as_bytes(cert)
def to_text(self, origin=None, relativize=True, **kw):
kw = kw.copy()
chunksize = kw.pop('chunksize', 128)
return '%d %d %d %s' % (self.usage,
self.selector,
self.mtype,
dns.rdata._hexify(self.cert,
chunksize=chunksize,
**kw))
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
usage = tok.get_uint8()
selector = tok.get_uint8()
mtype = tok.get_uint8()
cert = tok.concatenate_remaining_identifiers().encode()
cert = binascii.unhexlify(cert)
return cls(rdclass, rdtype, usage, selector, mtype, cert)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
header = struct.pack("!BBB", self.usage, self.selector, self.mtype)
file.write(header)
file.write(self.cert)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
header = parser.get_struct("BBB")
cert = parser.get_remaining()
return cls(rdclass, rdtype, header[0], header[1], header[2], cert)

View file

@ -20,10 +20,12 @@
import struct
import dns.exception
import dns.immutable
import dns.rdata
import dns.tokenizer
@dns.immutable.immutable
class TXTBase(dns.rdata.Rdata):
"""Base class for rdata that is like a TXT record (see RFC 1035)."""
@ -40,16 +42,8 @@ class TXTBase(dns.rdata.Rdata):
*strings*, a tuple of ``bytes``
"""
super().__init__(rdclass, rdtype)
if isinstance(strings, (bytes, str)):
strings = (strings,)
encoded_strings = []
for string in strings:
if isinstance(string, str):
string = string.encode()
else:
string = dns.rdata._constify(string)
encoded_strings.append(string)
object.__setattr__(self, 'strings', tuple(encoded_strings))
self.strings = self._as_tuple(strings,
lambda x: self._as_bytes(x, True, 255))
def to_text(self, origin=None, relativize=True, **kw):
txt = ''
@ -63,11 +57,12 @@ class TXTBase(dns.rdata.Rdata):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
strings = []
while 1:
token = tok.get().unescape_to_bytes()
if token.is_eol_or_eof():
break
if not (token.is_quoted_string() or token.is_identifier()):
for token in tok.get_remaining():
token = token.unescape_to_bytes()
# The 'if' below is always true in the current code, but we
# are leaving this check in in case things change some day.
if not (token.is_quoted_string() or
token.is_identifier()): # pragma: no cover
raise dns.exception.SyntaxError("expected a string")
if len(token.value) > 255:
raise dns.exception.SyntaxError("string too long")

View file

@ -0,0 +1,6 @@
from .. import rdata
class TXTBase(rdata.Rdata):
...
class TXT(TXTBase):
...

View file

@ -15,25 +15,31 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import collections
import random
import struct
import dns.exception
import dns.name
import dns.ipv4
import dns.ipv6
import dns.name
import dns.rdata
class Gateway:
"""A helper class for the IPSECKEY gateway and AMTRELAY relay fields"""
name = ""
def __init__(self, type, gateway=None):
self.type = type
self.type = dns.rdata.Rdata._as_uint8(type)
self.gateway = gateway
self._check()
def _invalid_type(self):
return f"invalid {self.name} type: {self.type}"
@classmethod
def _invalid_type(cls, gateway_type):
return f"invalid {cls.name} type: {gateway_type}"
def check(self):
def _check(self):
if self.type == 0:
if self.gateway not in (".", None):
raise SyntaxError(f"invalid {self.name} for type 0")
@ -48,7 +54,7 @@ class Gateway:
if not isinstance(self.gateway, dns.name.Name):
raise SyntaxError(f"invalid {self.name}; not a name")
else:
raise SyntaxError(self._invalid_type())
raise SyntaxError(self._invalid_type(self.type))
def to_text(self, origin=None, relativize=True):
if self.type == 0:
@ -58,16 +64,21 @@ class Gateway:
elif self.type == 3:
return str(self.gateway.choose_relativity(origin, relativize))
else:
raise ValueError(self._invalid_type())
raise ValueError(self._invalid_type(self.type)) # pragma: no cover
def from_text(self, tok, origin=None, relativize=True, relativize_to=None):
if self.type in (0, 1, 2):
return tok.get_string()
elif self.type == 3:
return tok.get_name(origin, relativize, relativize_to)
@classmethod
def from_text(cls, gateway_type, tok, origin=None, relativize=True,
relativize_to=None):
if gateway_type in (0, 1, 2):
gateway = tok.get_string()
elif gateway_type == 3:
gateway = tok.get_name(origin, relativize, relativize_to)
else:
raise dns.exception.SyntaxError(self._invalid_type())
raise dns.exception.SyntaxError(
cls._invalid_type(gateway_type)) # pragma: no cover
return cls(gateway_type, gateway)
# pylint: disable=unused-argument
def to_wire(self, file, compress=None, origin=None, canonicalize=False):
if self.type == 0:
pass
@ -78,26 +89,43 @@ class Gateway:
elif self.type == 3:
self.gateway.to_wire(file, None, origin, False)
else:
raise ValueError(self._invalid_type())
raise ValueError(self._invalid_type(self.type)) # pragma: no cover
# pylint: enable=unused-argument
def from_wire_parser(self, parser, origin=None):
if self.type == 0:
return None
elif self.type == 1:
return dns.ipv4.inet_ntoa(parser.get_bytes(4))
elif self.type == 2:
return dns.ipv6.inet_ntoa(parser.get_bytes(16))
elif self.type == 3:
return parser.get_name(origin)
@classmethod
def from_wire_parser(cls, gateway_type, parser, origin=None):
if gateway_type == 0:
gateway = None
elif gateway_type == 1:
gateway = dns.ipv4.inet_ntoa(parser.get_bytes(4))
elif gateway_type == 2:
gateway = dns.ipv6.inet_ntoa(parser.get_bytes(16))
elif gateway_type == 3:
gateway = parser.get_name(origin)
else:
raise dns.exception.FormError(self._invalid_type())
raise dns.exception.FormError(cls._invalid_type(gateway_type))
return cls(gateway_type, gateway)
class Bitmap:
"""A helper class for the NSEC/NSEC3/CSYNC type bitmaps"""
type_name = ""
def __init__(self, windows=None):
last_window = -1
self.windows = windows
for (window, bitmap) in self.windows:
if not isinstance(window, int):
raise ValueError(f"bad {self.type_name} window type")
if window <= last_window:
raise ValueError(f"bad {self.type_name} window order")
if window > 256:
raise ValueError(f"bad {self.type_name} window number")
last_window = window
if not isinstance(bitmap, bytes):
raise ValueError(f"bad {self.type_name} octets type")
if len(bitmap) == 0 or len(bitmap) > 32:
raise ValueError(f"bad {self.type_name} octets")
def to_text(self):
text = ""
@ -111,15 +139,13 @@ class Bitmap:
text += (' ' + ' '.join(bits))
return text
def from_text(self, tok):
@classmethod
def from_text(cls, tok):
rdtypes = []
while True:
token = tok.get().unescape()
if token.is_eol_or_eof():
break
rdtype = dns.rdatatype.from_text(token.value)
for token in tok.get_remaining():
rdtype = dns.rdatatype.from_text(token.unescape().value)
if rdtype == 0:
raise dns.exception.SyntaxError(f"{self.type_name} with bit 0")
raise dns.exception.SyntaxError(f"{cls.type_name} with bit 0")
rdtypes.append(rdtype)
rdtypes.sort()
window = 0
@ -134,7 +160,7 @@ class Bitmap:
new_window = rdtype // 256
if new_window != window:
if octets != 0:
windows.append((window, bitmap[0:octets]))
windows.append((window, bytes(bitmap[0:octets])))
bitmap = bytearray(b'\0' * 32)
window = new_window
offset = rdtype % 256
@ -143,24 +169,76 @@ class Bitmap:
octets = byte + 1
bitmap[byte] = bitmap[byte] | (0x80 >> bit)
if octets != 0:
windows.append((window, bitmap[0:octets]))
return windows
windows.append((window, bytes(bitmap[0:octets])))
return cls(windows)
def to_wire(self, file):
for (window, bitmap) in self.windows:
file.write(struct.pack('!BB', window, len(bitmap)))
file.write(bitmap)
def from_wire_parser(self, parser):
@classmethod
def from_wire_parser(cls, parser):
windows = []
last_window = -1
while parser.remaining() > 0:
window = parser.get_uint8()
if window <= last_window:
raise dns.exception.FormError(f"bad {self.type_name} bitmap")
bitmap = parser.get_counted_bytes()
if len(bitmap) == 0 or len(bitmap) > 32:
raise dns.exception.FormError(f"bad {self.type_name} octets")
windows.append((window, bitmap))
last_window = window
return windows
return cls(windows)
def _priority_table(items):
by_priority = collections.defaultdict(list)
for rdata in items:
by_priority[rdata._processing_priority()].append(rdata)
return by_priority
def priority_processing_order(iterable):
items = list(iterable)
if len(items) == 1:
return items
by_priority = _priority_table(items)
ordered = []
for k in sorted(by_priority.keys()):
rdatas = by_priority[k]
random.shuffle(rdatas)
ordered.extend(rdatas)
return ordered
_no_weight = 0.1
def weighted_processing_order(iterable):
items = list(iterable)
if len(items) == 1:
return items
by_priority = _priority_table(items)
ordered = []
for k in sorted(by_priority.keys()):
rdatas = by_priority[k]
total = sum(rdata._processing_weight() or _no_weight
for rdata in rdatas)
while len(rdatas) > 1:
r = random.uniform(0, total)
for (n, rdata) in enumerate(rdatas):
weight = rdata._processing_weight() or _no_weight
if weight > r:
break
r -= weight
total -= weight
ordered.append(rdata) # pylint: disable=undefined-loop-variable
del rdatas[n] # pylint: disable=undefined-loop-variable
ordered.append(rdatas[0])
return ordered
def parse_formatted_hex(formatted, num_chunks, chunk_size, separator):
if len(formatted) != num_chunks * (chunk_size + 1) - 1:
raise ValueError('invalid formatted hex string')
value = b''
for _ in range(num_chunks):
chunk = formatted[0:chunk_size]
value += int(chunk, 16).to_bytes(chunk_size // 2, 'big')
formatted = formatted[chunk_size:]
if len(formatted) > 0 and formatted[0] != separator:
raise ValueError('invalid formatted hex string')
formatted = formatted[1:]
return value