Update dnspython-2.2.0

This commit is contained in:
JonnyWong16 2021-10-14 21:36:41 -07:00
commit 4d62245cf5
No known key found for this signature in database
GPG key ID: B1F1F9807184697A
111 changed files with 9077 additions and 5877 deletions

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -18,12 +20,7 @@ import dns.rdtypes.mxbase
class AFSDB(dns.rdtypes.mxbase.UncompressedDowncasingMX):
"""AFSDB record
@ivar subtype: the subtype value
@type subtype: int
@ivar hostname: the hostname name
@type hostname: dns.name.Name object"""
"""AFSDB record"""
# Use the property mechanism to make "subtype" an alias for the
# "preference" attribute, and "hostname" an alias for the "exchange"
@ -36,18 +33,12 @@ class AFSDB(dns.rdtypes.mxbase.UncompressedDowncasingMX):
# implementation, but this way we don't copy code, and that's
# good.
def get_subtype(self):
@property
def subtype(self):
"the AFSDB subtype"
return self.preference
def set_subtype(self, subtype):
self.preference = subtype
subtype = property(get_subtype, set_subtype)
def get_hostname(self):
@property
def hostname(self):
"the AFSDB hostname"
return self.exchange
def set_hostname(self, hostname):
self.exchange = hostname
hostname = property(get_hostname, set_hostname)

View file

@ -0,0 +1,79 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2006, 2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
# provided that the above copyright notice and this permission notice
# appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import struct
import dns.exception
import dns.rdtypes.util
class Relay(dns.rdtypes.util.Gateway):
name = 'AMTRELAY relay'
class AMTRELAY(dns.rdata.Rdata):
"""AMTRELAY record"""
# see: RFC 8777
__slots__ = ['precedence', 'discovery_optional', 'relay_type', 'relay']
def __init__(self, rdclass, rdtype, precedence, discovery_optional,
relay_type, relay):
super().__init__(rdclass, rdtype)
Relay(relay_type, relay).check()
object.__setattr__(self, 'precedence', precedence)
object.__setattr__(self, 'discovery_optional', discovery_optional)
object.__setattr__(self, 'relay_type', relay_type)
object.__setattr__(self, 'relay', relay)
def to_text(self, origin=None, relativize=True, **kw):
relay = Relay(self.relay_type, self.relay).to_text(origin, relativize)
return '%d %d %d %s' % (self.precedence, self.discovery_optional,
self.relay_type, relay)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
precedence = tok.get_uint8()
discovery_optional = tok.get_uint8()
if discovery_optional > 1:
raise dns.exception.SyntaxError('expecting 0 or 1')
discovery_optional = bool(discovery_optional)
relay_type = tok.get_uint8()
if relay_type > 0x7f:
raise dns.exception.SyntaxError('expecting an integer <= 127')
relay = Relay(relay_type).from_text(tok, origin, relativize,
relativize_to)
return cls(rdclass, rdtype, precedence, discovery_optional, relay_type,
relay)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
relay_type = self.relay_type | (self.discovery_optional << 7)
header = struct.pack("!BB", self.precedence, relay_type)
file.write(header)
Relay(self.relay_type, self.relay).to_wire(file, compress, origin,
canonicalize)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
(precedence, relay_type) = parser.get_struct('!BB')
discovery_optional = bool(relay_type >> 7)
relay_type &= 0x7f
relay = Relay(relay_type).from_wire_parser(parser, origin)
return cls(rdclass, rdtype, precedence, discovery_optional, relay_type,
relay)

View file

@ -0,0 +1,25 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2016 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
# provided that the above copyright notice and this permission notice
# appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.txtbase
class AVC(dns.rdtypes.txtbase.TXTBase):
"""AVC record"""
# See: IANA dns parameters for AVC

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -22,23 +24,17 @@ import dns.tokenizer
class CAA(dns.rdata.Rdata):
"""CAA (Certification Authority Authorization) record
"""CAA (Certification Authority Authorization) record"""
@ivar flags: the flags
@type flags: int
@ivar tag: the tag
@type tag: string
@ivar value: the value
@type value: string
@see: RFC 6844"""
# see: RFC 6844
__slots__ = ['flags', 'tag', 'value']
def __init__(self, rdclass, rdtype, flags, tag, value):
super(CAA, self).__init__(rdclass, rdtype)
self.flags = flags
self.tag = tag
self.value = value
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'flags', flags)
object.__setattr__(self, 'tag', tag)
object.__setattr__(self, 'value', value)
def to_text(self, origin=None, relativize=True, **kw):
return '%u %s "%s"' % (self.flags,
@ -46,7 +42,8 @@ class CAA(dns.rdata.Rdata):
dns.rdata._escapify(self.value))
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
flags = tok.get_uint8()
tag = tok.get_string().encode()
if len(tag) > 255:
@ -56,7 +53,7 @@ class CAA(dns.rdata.Rdata):
value = tok.get_string().encode()
return cls(rdclass, rdtype, flags, tag, value)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
file.write(struct.pack('!B', self.flags))
l = len(self.tag)
assert l < 256
@ -65,10 +62,8 @@ class CAA(dns.rdata.Rdata):
file.write(self.value)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
(flags, l) = struct.unpack('!BB', wire[current: current + 2])
current += 2
tag = wire[current: current + l]
value = wire[current + l:current + rdlen - 2]
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
flags = parser.get_uint8()
tag = parser.get_counted_bytes()
value = parser.get_remaining()
return cls(rdclass, rdtype, flags, tag, value)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2004-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -14,10 +16,7 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.dnskeybase
from dns.rdtypes.dnskeybase import flags_to_text_set, flags_from_text_set
__all__ = ['flags_to_text_set', 'flags_from_text_set']
from dns.rdtypes.dnskeybase import SEP, REVOKE, ZONE # noqa: F401
class CDNSKEY(dns.rdtypes.dnskeybase.DNSKEYBase):

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -54,27 +56,19 @@ def _ctype_to_text(what):
class CERT(dns.rdata.Rdata):
"""CERT record
"""CERT record"""
@ivar certificate_type: certificate type
@type certificate_type: int
@ivar key_tag: key tag
@type key_tag: int
@ivar algorithm: algorithm
@type algorithm: int
@ivar certificate: the certificate or CRL
@type certificate: string
@see: RFC 2538"""
# see RFC 2538
__slots__ = ['certificate_type', 'key_tag', 'algorithm', 'certificate']
def __init__(self, rdclass, rdtype, certificate_type, key_tag, algorithm,
certificate):
super(CERT, self).__init__(rdclass, rdtype)
self.certificate_type = certificate_type
self.key_tag = key_tag
self.algorithm = algorithm
self.certificate = certificate
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'certificate_type', certificate_type)
object.__setattr__(self, 'key_tag', key_tag)
object.__setattr__(self, 'algorithm', algorithm)
object.__setattr__(self, 'certificate', certificate)
def to_text(self, origin=None, relativize=True, **kw):
certificate_type = _ctype_to_text(self.certificate_type)
@ -83,40 +77,27 @@ class CERT(dns.rdata.Rdata):
dns.rdata._base64ify(self.certificate))
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
certificate_type = _ctype_from_text(tok.get_string())
key_tag = tok.get_uint16()
algorithm = dns.dnssec.algorithm_from_text(tok.get_string())
if algorithm < 0 or algorithm > 255:
raise dns.exception.SyntaxError("bad algorithm type")
chunks = []
while 1:
t = tok.get().unescape()
if t.is_eol_or_eof():
break
if not t.is_identifier():
raise dns.exception.SyntaxError
chunks.append(t.value.encode())
b64 = b''.join(chunks)
b64 = tok.concatenate_remaining_identifiers().encode()
certificate = base64.b64decode(b64)
return cls(rdclass, rdtype, certificate_type, key_tag,
algorithm, certificate)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
prefix = struct.pack("!HHB", self.certificate_type, self.key_tag,
self.algorithm)
file.write(prefix)
file.write(self.certificate)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
prefix = wire[current: current + 5].unwrap()
current += 5
rdlen -= 5
if rdlen < 0:
raise dns.exception.FormError
(certificate_type, key_tag, algorithm) = struct.unpack("!HHB", prefix)
certificate = wire[current: current + rdlen].unwrap()
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
(certificate_type, key_tag, algorithm) = parser.get_struct("!HHB")
certificate = parser.get_remaining()
return cls(rdclass, rdtype, certificate_type, key_tag, algorithm,
certificate)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2004-2007, 2009-2011, 2016 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -19,106 +21,43 @@ import dns.exception
import dns.rdata
import dns.rdatatype
import dns.name
from dns._compat import xrange
import dns.rdtypes.util
class Bitmap(dns.rdtypes.util.Bitmap):
type_name = 'CSYNC'
class CSYNC(dns.rdata.Rdata):
"""CSYNC record
@ivar serial: the SOA serial number
@type serial: int
@ivar flags: the CSYNC flags
@type flags: int
@ivar windows: the windowed bitmap list
@type windows: list of (window number, string) tuples"""
"""CSYNC record"""
__slots__ = ['serial', 'flags', 'windows']
def __init__(self, rdclass, rdtype, serial, flags, windows):
super(CSYNC, self).__init__(rdclass, rdtype)
self.serial = serial
self.flags = flags
self.windows = windows
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'serial', serial)
object.__setattr__(self, 'flags', flags)
object.__setattr__(self, 'windows', dns.rdata._constify(windows))
def to_text(self, origin=None, relativize=True, **kw):
text = ''
for (window, bitmap) in self.windows:
bits = []
for i in xrange(0, len(bitmap)):
byte = bitmap[i]
for j in xrange(0, 8):
if byte & (0x80 >> j):
bits.append(dns.rdatatype.to_text(window * 256 +
i * 8 + j))
text += (' ' + ' '.join(bits))
text = Bitmap(self.windows).to_text()
return '%d %d%s' % (self.serial, self.flags, text)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
serial = tok.get_uint32()
flags = tok.get_uint16()
rdtypes = []
while 1:
token = tok.get().unescape()
if token.is_eol_or_eof():
break
nrdtype = dns.rdatatype.from_text(token.value)
if nrdtype == 0:
raise dns.exception.SyntaxError("CSYNC with bit 0")
if nrdtype > 65535:
raise dns.exception.SyntaxError("CSYNC with bit > 65535")
rdtypes.append(nrdtype)
rdtypes.sort()
window = 0
octets = 0
prior_rdtype = 0
bitmap = bytearray(b'\0' * 32)
windows = []
for nrdtype in rdtypes:
if nrdtype == prior_rdtype:
continue
prior_rdtype = nrdtype
new_window = nrdtype // 256
if new_window != window:
windows.append((window, bitmap[0:octets]))
bitmap = bytearray(b'\0' * 32)
window = new_window
offset = nrdtype % 256
byte = offset // 8
bit = offset % 8
octets = byte + 1
bitmap[byte] = bitmap[byte] | (0x80 >> bit)
windows.append((window, bitmap[0:octets]))
windows = Bitmap().from_text(tok)
return cls(rdclass, rdtype, serial, flags, windows)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
file.write(struct.pack('!IH', self.serial, self.flags))
for (window, bitmap) in self.windows:
file.write(struct.pack('!BB', window, len(bitmap)))
file.write(bitmap)
Bitmap(self.windows).to_wire(file)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
if rdlen < 6:
raise dns.exception.FormError("CSYNC too short")
(serial, flags) = struct.unpack("!IH", wire[current: current + 6])
current += 6
rdlen -= 6
windows = []
while rdlen > 0:
if rdlen < 3:
raise dns.exception.FormError("CSYNC too short")
window = wire[current]
octets = wire[current + 1]
if octets == 0 or octets > 32:
raise dns.exception.FormError("bad CSYNC octets")
current += 2
rdlen -= 2
if rdlen < octets:
raise dns.exception.FormError("bad CSYNC bitmap length")
bitmap = bytearray(wire[current: current + octets].unwrap())
current += octets
rdlen -= octets
windows.append((window, bitmap))
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
(serial, flags) = parser.get_struct("!IH")
windows = Bitmap().from_wire_parser(parser)
return cls(rdclass, rdtype, serial, flags, windows)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -20,5 +22,5 @@ class DNAME(dns.rdtypes.nsbase.UncompressedNS):
"""DNAME record"""
def to_digestable(self, origin=None):
return self.target.to_digestable(origin)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
self.target.to_wire(file, None, origin, canonicalize)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2004-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -14,10 +16,7 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.dnskeybase
from dns.rdtypes.dnskeybase import flags_to_text_set, flags_from_text_set
__all__ = ['flags_to_text_set', 'flags_from_text_set']
from dns.rdtypes.dnskeybase import SEP, REVOKE, ZONE # noqa: F401
class DNSKEY(dns.rdtypes.dnskeybase.DNSKEYBase):

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2015 Red Hat, Inc.
# Author: Petr Spacek <pspacek@redhat.com>
#
@ -19,11 +21,9 @@ import dns.rdtypes.euibase
class EUI48(dns.rdtypes.euibase.EUIBase):
"""EUI48 record
"""EUI48 record"""
@ivar fingerprint: 48-bit Extended Unique Identifier (EUI-48)
@type fingerprint: string
@see: rfc7043.txt"""
# see: rfc7043.txt
byte_len = 6 # 0123456789ab (in hex)
text_len = byte_len * 3 - 1 # 01-23-45-67-89-ab

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2015 Red Hat, Inc.
# Author: Petr Spacek <pspacek@redhat.com>
#
@ -19,11 +21,9 @@ import dns.rdtypes.euibase
class EUI64(dns.rdtypes.euibase.EUIBase):
"""EUI64 record
"""EUI64 record"""
@ivar fingerprint: 64-bit Extended Unique Identifier (EUI-64)
@type fingerprint: string
@see: rfc7043.txt"""
# see: rfc7043.txt
byte_len = 8 # 0123456789abcdef (in hex)
text_len = byte_len * 3 - 1 # 01-23-45-67-89-ab-cd-ef

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -18,15 +20,19 @@ import struct
import dns.exception
import dns.rdata
import dns.tokenizer
from dns._compat import long, text_type
def _validate_float_string(what):
if len(what) == 0:
raise dns.exception.FormError
if what[0] == b'-'[0] or what[0] == b'+'[0]:
what = what[1:]
if what.isdigit():
return
(left, right) = what.split(b'.')
try:
(left, right) = what.split(b'.')
except ValueError:
raise dns.exception.FormError
if left == b'' and right == b'':
raise dns.exception.FormError
if not left == b'' and not left.decode().isdigit():
@ -36,38 +42,29 @@ def _validate_float_string(what):
def _sanitize(value):
if isinstance(value, text_type):
if isinstance(value, str):
return value.encode()
return value
class GPOS(dns.rdata.Rdata):
"""GPOS record
"""GPOS record"""
@ivar latitude: latitude
@type latitude: string
@ivar longitude: longitude
@type longitude: string
@ivar altitude: altitude
@type altitude: string
@see: RFC 1712"""
# see: RFC 1712
__slots__ = ['latitude', 'longitude', 'altitude']
def __init__(self, rdclass, rdtype, latitude, longitude, altitude):
super(GPOS, self).__init__(rdclass, rdtype)
super().__init__(rdclass, rdtype)
if isinstance(latitude, float) or \
isinstance(latitude, int) or \
isinstance(latitude, long):
isinstance(latitude, int):
latitude = str(latitude)
if isinstance(longitude, float) or \
isinstance(longitude, int) or \
isinstance(longitude, long):
isinstance(longitude, int):
longitude = str(longitude)
if isinstance(altitude, float) or \
isinstance(altitude, int) or \
isinstance(altitude, long):
isinstance(altitude, int):
altitude = str(altitude)
latitude = _sanitize(latitude)
longitude = _sanitize(longitude)
@ -75,24 +72,31 @@ class GPOS(dns.rdata.Rdata):
_validate_float_string(latitude)
_validate_float_string(longitude)
_validate_float_string(altitude)
self.latitude = latitude
self.longitude = longitude
self.altitude = altitude
object.__setattr__(self, 'latitude', latitude)
object.__setattr__(self, 'longitude', longitude)
object.__setattr__(self, 'altitude', altitude)
flat = self.float_latitude
if flat < -90.0 or flat > 90.0:
raise dns.exception.FormError('bad latitude')
flong = self.float_longitude
if flong < -180.0 or flong > 180.0:
raise dns.exception.FormError('bad longitude')
def to_text(self, origin=None, relativize=True, **kw):
return '%s %s %s' % (self.latitude.decode(),
self.longitude.decode(),
self.altitude.decode())
return '{} {} {}'.format(self.latitude.decode(),
self.longitude.decode(),
self.altitude.decode())
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
latitude = tok.get_string()
longitude = tok.get_string()
altitude = tok.get_string()
tok.get_eol()
return cls(rdclass, rdtype, latitude, longitude, altitude)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
l = len(self.latitude)
assert l < 256
file.write(struct.pack('!B', l))
@ -107,54 +111,23 @@ class GPOS(dns.rdata.Rdata):
file.write(self.altitude)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
l = wire[current]
current += 1
rdlen -= 1
if l > rdlen:
raise dns.exception.FormError
latitude = wire[current: current + l].unwrap()
current += l
rdlen -= l
l = wire[current]
current += 1
rdlen -= 1
if l > rdlen:
raise dns.exception.FormError
longitude = wire[current: current + l].unwrap()
current += l
rdlen -= l
l = wire[current]
current += 1
rdlen -= 1
if l != rdlen:
raise dns.exception.FormError
altitude = wire[current: current + l].unwrap()
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
latitude = parser.get_counted_bytes()
longitude = parser.get_counted_bytes()
altitude = parser.get_counted_bytes()
return cls(rdclass, rdtype, latitude, longitude, altitude)
def _get_float_latitude(self):
@property
def float_latitude(self):
"latitude as a floating point value"
return float(self.latitude)
def _set_float_latitude(self, value):
self.latitude = str(value)
float_latitude = property(_get_float_latitude, _set_float_latitude,
doc="latitude as a floating point value")
def _get_float_longitude(self):
@property
def float_longitude(self):
"longitude as a floating point value"
return float(self.longitude)
def _set_float_longitude(self, value):
self.longitude = str(value)
float_longitude = property(_get_float_longitude, _set_float_longitude,
doc="longitude as a floating point value")
def _get_float_altitude(self):
@property
def float_altitude(self):
"altitude as a floating point value"
return float(self.altitude)
def _set_float_altitude(self, value):
self.altitude = str(value)
float_altitude = property(_get_float_altitude, _set_float_altitude,
doc="altitude as a floating point value")

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -18,44 +20,40 @@ import struct
import dns.exception
import dns.rdata
import dns.tokenizer
from dns._compat import text_type
class HINFO(dns.rdata.Rdata):
"""HINFO record
"""HINFO record"""
@ivar cpu: the CPU type
@type cpu: string
@ivar os: the OS type
@type os: string
@see: RFC 1035"""
# see: RFC 1035
__slots__ = ['cpu', 'os']
def __init__(self, rdclass, rdtype, cpu, os):
super(HINFO, self).__init__(rdclass, rdtype)
if isinstance(cpu, text_type):
self.cpu = cpu.encode()
super().__init__(rdclass, rdtype)
if isinstance(cpu, str):
object.__setattr__(self, 'cpu', cpu.encode())
else:
self.cpu = cpu
if isinstance(os, text_type):
self.os = os.encode()
object.__setattr__(self, 'cpu', cpu)
if isinstance(os, str):
object.__setattr__(self, 'os', os.encode())
else:
self.os = os
object.__setattr__(self, 'os', os)
def to_text(self, origin=None, relativize=True, **kw):
return '"%s" "%s"' % (dns.rdata._escapify(self.cpu),
dns.rdata._escapify(self.os))
return '"{}" "{}"'.format(dns.rdata._escapify(self.cpu),
dns.rdata._escapify(self.os))
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
cpu = tok.get_string()
os = tok.get_string()
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
cpu = tok.get_string(max_length=255)
os = tok.get_string(max_length=255)
tok.get_eol()
return cls(rdclass, rdtype, cpu, os)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
l = len(self.cpu)
assert l < 256
file.write(struct.pack('!B', l))
@ -66,20 +64,7 @@ class HINFO(dns.rdata.Rdata):
file.write(self.os)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
l = wire[current]
current += 1
rdlen -= 1
if l > rdlen:
raise dns.exception.FormError
cpu = wire[current:current + l].unwrap()
current += l
rdlen -= l
l = wire[current]
current += 1
rdlen -= 1
if l != rdlen:
raise dns.exception.FormError
os = wire[current: current + l].unwrap()
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
cpu = parser.get_counted_bytes()
os = parser.get_counted_bytes()
return cls(rdclass, rdtype, cpu, os)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2010, 2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -24,40 +26,33 @@ import dns.rdatatype
class HIP(dns.rdata.Rdata):
"""HIP record
"""HIP record"""
@ivar hit: the host identity tag
@type hit: string
@ivar algorithm: the public key cryptographic algorithm
@type algorithm: int
@ivar key: the public key
@type key: string
@ivar servers: the rendezvous servers
@type servers: list of dns.name.Name objects
@see: RFC 5205"""
# see: RFC 5205
__slots__ = ['hit', 'algorithm', 'key', 'servers']
def __init__(self, rdclass, rdtype, hit, algorithm, key, servers):
super(HIP, self).__init__(rdclass, rdtype)
self.hit = hit
self.algorithm = algorithm
self.key = key
self.servers = servers
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'hit', hit)
object.__setattr__(self, 'algorithm', algorithm)
object.__setattr__(self, 'key', key)
object.__setattr__(self, 'servers', dns.rdata._constify(servers))
def to_text(self, origin=None, relativize=True, **kw):
hit = binascii.hexlify(self.hit).decode()
key = base64.b64encode(self.key).replace(b'\n', b'').decode()
text = u''
text = ''
servers = []
for server in self.servers:
servers.append(server.choose_relativity(origin, relativize))
if len(servers) > 0:
text += (u' ' + u' '.join(map(lambda x: x.to_unicode(), servers)))
return u'%u %s %s%s' % (self.algorithm, hit, key, text)
text += (' ' + ' '.join((x.to_unicode() for x in servers)))
return '%u %s %s%s' % (self.algorithm, hit, key, text)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
algorithm = tok.get_uint8()
hit = binascii.unhexlify(tok.get_string().encode())
if len(hit) > 255:
@ -68,46 +63,26 @@ class HIP(dns.rdata.Rdata):
token = tok.get()
if token.is_eol_or_eof():
break
server = dns.name.from_text(token.value, origin)
server.choose_relativity(origin, relativize)
server = tok.as_name(token, origin, relativize, relativize_to)
servers.append(server)
return cls(rdclass, rdtype, hit, algorithm, key, servers)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
lh = len(self.hit)
lk = len(self.key)
file.write(struct.pack("!BBH", lh, self.algorithm, lk))
file.write(self.hit)
file.write(self.key)
for server in self.servers:
server.to_wire(file, None, origin)
server.to_wire(file, None, origin, False)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
(lh, algorithm, lk) = struct.unpack('!BBH',
wire[current: current + 4])
current += 4
rdlen -= 4
hit = wire[current: current + lh].unwrap()
current += lh
rdlen -= lh
key = wire[current: current + lk].unwrap()
current += lk
rdlen -= lk
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
(lh, algorithm, lk) = parser.get_struct('!BBH')
hit = parser.get_bytes(lh)
key = parser.get_bytes(lk)
servers = []
while rdlen > 0:
(server, cused) = dns.name.from_wire(wire[: current + rdlen],
current)
current += cused
rdlen -= cused
if origin is not None:
server = server.relativize(origin)
while parser.remaining() > 0:
server = parser.get_name(origin)
servers.append(server)
return cls(rdclass, rdtype, hit, algorithm, key, servers)
def choose_relativity(self, origin=None, relativize=True):
servers = []
for server in self.servers:
server = server.choose_relativity(origin, relativize)
servers.append(server)
self.servers = servers

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -18,41 +20,37 @@ import struct
import dns.exception
import dns.rdata
import dns.tokenizer
from dns._compat import text_type
class ISDN(dns.rdata.Rdata):
"""ISDN record
"""ISDN record"""
@ivar address: the ISDN address
@type address: string
@ivar subaddress: the ISDN subaddress (or '' if not present)
@type subaddress: string
@see: RFC 1183"""
# see: RFC 1183
__slots__ = ['address', 'subaddress']
def __init__(self, rdclass, rdtype, address, subaddress):
super(ISDN, self).__init__(rdclass, rdtype)
if isinstance(address, text_type):
self.address = address.encode()
super().__init__(rdclass, rdtype)
if isinstance(address, str):
object.__setattr__(self, 'address', address.encode())
else:
self.address = address
if isinstance(address, text_type):
self.subaddress = subaddress.encode()
object.__setattr__(self, 'address', address)
if isinstance(address, str):
object.__setattr__(self, 'subaddress', subaddress.encode())
else:
self.subaddress = subaddress
object.__setattr__(self, 'subaddress', subaddress)
def to_text(self, origin=None, relativize=True, **kw):
if self.subaddress:
return '"%s" "%s"' % (dns.rdata._escapify(self.address),
dns.rdata._escapify(self.subaddress))
return '"{}" "{}"'.format(dns.rdata._escapify(self.address),
dns.rdata._escapify(self.subaddress))
else:
return '"%s"' % dns.rdata._escapify(self.address)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
address = tok.get_string()
t = tok.get()
if not t.is_eol_or_eof():
@ -64,7 +62,7 @@ class ISDN(dns.rdata.Rdata):
tok.get_eol()
return cls(rdclass, rdtype, address, subaddress)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
l = len(self.address)
assert l < 256
file.write(struct.pack('!B', l))
@ -76,23 +74,10 @@ class ISDN(dns.rdata.Rdata):
file.write(self.subaddress)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
l = wire[current]
current += 1
rdlen -= 1
if l > rdlen:
raise dns.exception.FormError
address = wire[current: current + l].unwrap()
current += l
rdlen -= l
if rdlen > 0:
l = wire[current]
current += 1
rdlen -= 1
if l != rdlen:
raise dns.exception.FormError
subaddress = wire[current: current + l].unwrap()
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
address = parser.get_counted_bytes()
if parser.remaining() > 0:
subaddress = parser.get_counted_bytes()
else:
subaddress = ''
subaddress = b''
return cls(rdclass, rdtype, address, subaddress)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -17,23 +19,32 @@ import struct
import dns.exception
import dns.rdata
from dns._compat import long, xrange
_pows = tuple(long(10**i) for i in range(0, 11))
_pows = tuple(10**i for i in range(0, 11))
# default values are in centimeters
_default_size = 100.0
_default_hprec = 1000000.0
_default_vprec = 1000.0
# for use by from_wire()
_MAX_LATITUDE = 0x80000000 + 90 * 3600000
_MIN_LATITUDE = 0x80000000 - 90 * 3600000
_MAX_LONGITUDE = 0x80000000 + 180 * 3600000
_MIN_LONGITUDE = 0x80000000 - 180 * 3600000
# pylint complains about division since we don't have a from __future__ for
# it, but we don't care about python 2 warnings, so turn them off.
#
# pylint: disable=old-division
def _exponent_of(what, desc):
if what == 0:
return 0
exp = None
for i in xrange(len(_pows)):
if what // _pows[i] == long(0):
for (i, pow) in enumerate(_pows):
if what // pow == 0:
exp = i - 1
break
if exp is None or exp < 0:
@ -47,7 +58,7 @@ def _float_to_tuple(what):
what *= -1
else:
sign = 1
what = long(round(what * 3600000))
what = round(what * 3600000) # pylint: disable=round-builtin
degrees = int(what // 3600000)
what -= degrees * 3600000
minutes = int(what // 60000)
@ -67,7 +78,7 @@ def _tuple_to_float(what):
def _encode_size(what, desc):
what = long(what)
what = int(what)
exponent = _exponent_of(what, desc) & 0xF
base = what // pow(10, exponent) & 0xF
return base * 16 + exponent
@ -76,32 +87,18 @@ def _encode_size(what, desc):
def _decode_size(what, desc):
exponent = what & 0x0F
if exponent > 9:
raise dns.exception.SyntaxError("bad %s exponent" % desc)
raise dns.exception.FormError("bad %s exponent" % desc)
base = (what & 0xF0) >> 4
if base > 9:
raise dns.exception.SyntaxError("bad %s base" % desc)
return long(base) * pow(10, exponent)
raise dns.exception.FormError("bad %s base" % desc)
return base * pow(10, exponent)
class LOC(dns.rdata.Rdata):
"""LOC record
"""LOC record"""
@ivar latitude: latitude
@type latitude: (int, int, int, int, sign) tuple specifying the degrees, minutes,
seconds, milliseconds, and sign of the coordinate.
@ivar longitude: longitude
@type longitude: (int, int, int, int, sign) tuple specifying the degrees,
minutes, seconds, milliseconds, and sign of the coordinate.
@ivar altitude: altitude
@type altitude: float
@ivar size: size of the sphere
@type size: float
@ivar horizontal_precision: horizontal precision
@type horizontal_precision: float
@ivar vertical_precision: vertical precision
@type vertical_precision: float
@see: RFC 1876"""
# see: RFC 1876
__slots__ = ['latitude', 'longitude', 'altitude', 'size',
'horizontal_precision', 'vertical_precision']
@ -117,35 +114,31 @@ class LOC(dns.rdata.Rdata):
degrees. The other parameters are floats. Size, horizontal precision,
and vertical precision are specified in centimeters."""
super(LOC, self).__init__(rdclass, rdtype)
if isinstance(latitude, int) or isinstance(latitude, long):
super().__init__(rdclass, rdtype)
if isinstance(latitude, int):
latitude = float(latitude)
if isinstance(latitude, float):
latitude = _float_to_tuple(latitude)
self.latitude = latitude
if isinstance(longitude, int) or isinstance(longitude, long):
object.__setattr__(self, 'latitude', dns.rdata._constify(latitude))
if isinstance(longitude, int):
longitude = float(longitude)
if isinstance(longitude, float):
longitude = _float_to_tuple(longitude)
self.longitude = longitude
self.altitude = float(altitude)
self.size = float(size)
self.horizontal_precision = float(hprec)
self.vertical_precision = float(vprec)
object.__setattr__(self, 'longitude', dns.rdata._constify(longitude))
object.__setattr__(self, 'altitude', float(altitude))
object.__setattr__(self, 'size', float(size))
object.__setattr__(self, 'horizontal_precision', float(hprec))
object.__setattr__(self, 'vertical_precision', float(vprec))
def to_text(self, origin=None, relativize=True, **kw):
if self.latitude[4] > 0:
lat_hemisphere = 'N'
lat_degrees = self.latitude[0]
else:
lat_hemisphere = 'S'
lat_degrees = -1 * self.latitude[0]
if self.longitude[4] > 0:
long_hemisphere = 'E'
long_degrees = self.longitude[0]
else:
long_hemisphere = 'W'
long_degrees = -1 * self.longitude[0]
text = "%d %d %d.%03d %s %d %d %d.%03d %s %0.2fm" % (
self.latitude[0], self.latitude[1],
self.latitude[2], self.latitude[3], lat_hemisphere,
@ -158,14 +151,15 @@ class LOC(dns.rdata.Rdata):
if self.size != _default_size or \
self.horizontal_precision != _default_hprec or \
self.vertical_precision != _default_vprec:
text += " %0.2fm %0.2fm %0.2fm" % (
text += " {:0.2f}m {:0.2f}m {:0.2f}m".format(
self.size / 100.0, self.horizontal_precision / 100.0,
self.vertical_precision / 100.0
)
return text
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
latitude = [0, 0, 0, 0, 1]
longitude = [0, 0, 0, 0, 1]
size = _default_size
@ -173,9 +167,13 @@ class LOC(dns.rdata.Rdata):
vprec = _default_vprec
latitude[0] = tok.get_int()
if latitude[0] > 90:
raise dns.exception.SyntaxError('latitude >= 90')
t = tok.get_string()
if t.isdigit():
latitude[1] = int(t)
if latitude[1] >= 60:
raise dns.exception.SyntaxError('latitude minutes >= 60')
t = tok.get_string()
if '.' in t:
(seconds, milliseconds) = t.split('.')
@ -206,9 +204,13 @@ class LOC(dns.rdata.Rdata):
raise dns.exception.SyntaxError('bad latitude hemisphere value')
longitude[0] = tok.get_int()
if longitude[0] > 180:
raise dns.exception.SyntaxError('longitude > 180')
t = tok.get_string()
if t.isdigit():
longitude[1] = int(t)
if longitude[1] >= 60:
raise dns.exception.SyntaxError('longitude minutes >= 60')
t = tok.get_string()
if '.' in t:
(seconds, milliseconds) = t.split('.')
@ -263,21 +265,26 @@ class LOC(dns.rdata.Rdata):
vprec = float(value) * 100.0 # m -> cm
tok.get_eol()
# Try encoding these now so we raise if they are bad
_encode_size(size, "size")
_encode_size(hprec, "horizontal precision")
_encode_size(vprec, "vertical precision")
return cls(rdclass, rdtype, latitude, longitude, altitude,
size, hprec, vprec)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
milliseconds = (self.latitude[0] * 3600000 +
self.latitude[1] * 60000 +
self.latitude[2] * 1000 +
self.latitude[3]) * self.latitude[4]
latitude = long(0x80000000) + milliseconds
latitude = 0x80000000 + milliseconds
milliseconds = (self.longitude[0] * 3600000 +
self.longitude[1] * 60000 +
self.longitude[2] * 1000 +
self.longitude[3]) * self.longitude[4]
longitude = long(0x80000000) + milliseconds
altitude = long(self.altitude) + long(10000000)
longitude = 0x80000000 + milliseconds
altitude = int(self.altitude) + 10000000
size = _encode_size(self.size, "size")
hprec = _encode_size(self.horizontal_precision, "horizontal precision")
vprec = _encode_size(self.vertical_precision, "vertical precision")
@ -286,21 +293,21 @@ class LOC(dns.rdata.Rdata):
file.write(wire)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
(version, size, hprec, vprec, latitude, longitude, altitude) = \
struct.unpack("!BBBBIII", wire[current: current + rdlen])
if latitude > long(0x80000000):
latitude = float(latitude - long(0x80000000)) / 3600000
else:
latitude = -1 * float(long(0x80000000) - latitude) / 3600000
if latitude < -90.0 or latitude > 90.0:
parser.get_struct("!BBBBIII")
if latitude < _MIN_LATITUDE or latitude > _MAX_LATITUDE:
raise dns.exception.FormError("bad latitude")
if longitude > long(0x80000000):
longitude = float(longitude - long(0x80000000)) / 3600000
if latitude > 0x80000000:
latitude = (latitude - 0x80000000) / 3600000
else:
longitude = -1 * float(long(0x80000000) - longitude) / 3600000
if longitude < -180.0 or longitude > 180.0:
latitude = -1 * (0x80000000 - latitude) / 3600000
if longitude < _MIN_LONGITUDE or longitude > _MAX_LONGITUDE:
raise dns.exception.FormError("bad longitude")
if longitude > 0x80000000:
longitude = (longitude - 0x80000000) / 3600000
else:
longitude = -1 * (0x80000000 - longitude) / 3600000
altitude = float(altitude) - 10000000.0
size = _decode_size(size, "size")
hprec = _decode_size(hprec, "horizontal precision")
@ -308,20 +315,12 @@ class LOC(dns.rdata.Rdata):
return cls(rdclass, rdtype, latitude, longitude, altitude,
size, hprec, vprec)
def _get_float_latitude(self):
@property
def float_latitude(self):
"latitude as a floating point value"
return _tuple_to_float(self.latitude)
def _set_float_latitude(self, value):
self.latitude = _float_to_tuple(value)
float_latitude = property(_get_float_latitude, _set_float_latitude,
doc="latitude as a floating point value")
def _get_float_longitude(self):
@property
def float_longitude(self):
"longitude as a floating point value"
return _tuple_to_float(self.longitude)
def _set_float_longitude(self, value):
self.longitude = _float_to_tuple(value)
float_longitude = property(_get_float_longitude, _set_float_longitude,
doc="longitude as a floating point value")

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its

View file

@ -0,0 +1,25 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2006, 2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
# provided that the above copyright notice and this permission notice
# appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.txtbase
class NINFO(dns.rdtypes.txtbase.TXTBase):
"""NINFO record"""
# see: draft-reid-dnsext-zs-01

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2004-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -13,114 +15,46 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import struct
import dns.exception
import dns.rdata
import dns.rdatatype
import dns.name
from dns._compat import xrange
import dns.rdtypes.util
class Bitmap(dns.rdtypes.util.Bitmap):
type_name = 'NSEC'
class NSEC(dns.rdata.Rdata):
"""NSEC record
@ivar next: the next name
@type next: dns.name.Name object
@ivar windows: the windowed bitmap list
@type windows: list of (window number, string) tuples"""
"""NSEC record"""
__slots__ = ['next', 'windows']
def __init__(self, rdclass, rdtype, next, windows):
super(NSEC, self).__init__(rdclass, rdtype)
self.next = next
self.windows = windows
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'next', next)
object.__setattr__(self, 'windows', dns.rdata._constify(windows))
def to_text(self, origin=None, relativize=True, **kw):
next = self.next.choose_relativity(origin, relativize)
text = ''
for (window, bitmap) in self.windows:
bits = []
for i in xrange(0, len(bitmap)):
byte = bitmap[i]
for j in xrange(0, 8):
if byte & (0x80 >> j):
bits.append(dns.rdatatype.to_text(window * 256 +
i * 8 + j))
text += (' ' + ' '.join(bits))
return '%s%s' % (next, text)
text = Bitmap(self.windows).to_text()
return '{}{}'.format(next, text)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
next = tok.get_name()
next = next.choose_relativity(origin, relativize)
rdtypes = []
while 1:
token = tok.get().unescape()
if token.is_eol_or_eof():
break
nrdtype = dns.rdatatype.from_text(token.value)
if nrdtype == 0:
raise dns.exception.SyntaxError("NSEC with bit 0")
if nrdtype > 65535:
raise dns.exception.SyntaxError("NSEC with bit > 65535")
rdtypes.append(nrdtype)
rdtypes.sort()
window = 0
octets = 0
prior_rdtype = 0
bitmap = bytearray(b'\0' * 32)
windows = []
for nrdtype in rdtypes:
if nrdtype == prior_rdtype:
continue
prior_rdtype = nrdtype
new_window = nrdtype // 256
if new_window != window:
windows.append((window, bitmap[0:octets]))
bitmap = bytearray(b'\0' * 32)
window = new_window
offset = nrdtype % 256
byte = offset // 8
bit = offset % 8
octets = byte + 1
bitmap[byte] = bitmap[byte] | (0x80 >> bit)
windows.append((window, bitmap[0:octets]))
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
next = tok.get_name(origin, relativize, relativize_to)
windows = Bitmap().from_text(tok)
return cls(rdclass, rdtype, next, windows)
def to_wire(self, file, compress=None, origin=None):
self.next.to_wire(file, None, origin)
for (window, bitmap) in self.windows:
file.write(struct.pack('!BB', window, len(bitmap)))
file.write(bitmap)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
self.next.to_wire(file, None, origin, False)
Bitmap(self.windows).to_wire(file)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
(next, cused) = dns.name.from_wire(wire[: current + rdlen], current)
current += cused
rdlen -= cused
windows = []
while rdlen > 0:
if rdlen < 3:
raise dns.exception.FormError("NSEC too short")
window = wire[current]
octets = wire[current + 1]
if octets == 0 or octets > 32:
raise dns.exception.FormError("bad NSEC octets")
current += 2
rdlen -= 2
if rdlen < octets:
raise dns.exception.FormError("bad NSEC bitmap length")
bitmap = bytearray(wire[current: current + octets].unwrap())
current += octets
rdlen -= octets
windows.append((window, bitmap))
if origin is not None:
next = next.relativize(origin)
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
next = parser.get_name(origin)
windows = Bitmap().from_wire_parser(parser)
return cls(rdclass, rdtype, next, windows)
def choose_relativity(self, origin=None, relativize=True):
self.next = self.next.choose_relativity(origin, relativize)

View file

@ -1,4 +1,6 @@
# Copyright (C) 2004-2007, 2009-2011 Nominum, Inc.
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2004-2017 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
@ -15,24 +17,18 @@
import base64
import binascii
import string
import struct
import dns.exception
import dns.rdata
import dns.rdatatype
from dns._compat import xrange, text_type
import dns.rdtypes.util
try:
b32_hex_to_normal = string.maketrans('0123456789ABCDEFGHIJKLMNOPQRSTUV',
'ABCDEFGHIJKLMNOPQRSTUVWXYZ234567')
b32_normal_to_hex = string.maketrans('ABCDEFGHIJKLMNOPQRSTUVWXYZ234567',
'0123456789ABCDEFGHIJKLMNOPQRSTUV')
except AttributeError:
b32_hex_to_normal = bytes.maketrans(b'0123456789ABCDEFGHIJKLMNOPQRSTUV',
b'ABCDEFGHIJKLMNOPQRSTUVWXYZ234567')
b32_normal_to_hex = bytes.maketrans(b'ABCDEFGHIJKLMNOPQRSTUVWXYZ234567',
b'0123456789ABCDEFGHIJKLMNOPQRSTUV')
b32_hex_to_normal = bytes.maketrans(b'0123456789ABCDEFGHIJKLMNOPQRSTUV',
b'ABCDEFGHIJKLMNOPQRSTUVWXYZ234567')
b32_normal_to_hex = bytes.maketrans(b'ABCDEFGHIJKLMNOPQRSTUVWXYZ234567',
b'0123456789ABCDEFGHIJKLMNOPQRSTUV')
# hash algorithm constants
SHA1 = 1
@ -41,37 +37,28 @@ SHA1 = 1
OPTOUT = 1
class Bitmap(dns.rdtypes.util.Bitmap):
type_name = 'NSEC3'
class NSEC3(dns.rdata.Rdata):
"""NSEC3 record
@ivar algorithm: the hash algorithm number
@type algorithm: int
@ivar flags: the flags
@type flags: int
@ivar iterations: the number of iterations
@type iterations: int
@ivar salt: the salt
@type salt: string
@ivar next: the next name hash
@type next: string
@ivar windows: the windowed bitmap list
@type windows: list of (window number, string) tuples"""
"""NSEC3 record"""
__slots__ = ['algorithm', 'flags', 'iterations', 'salt', 'next', 'windows']
def __init__(self, rdclass, rdtype, algorithm, flags, iterations, salt,
next, windows):
super(NSEC3, self).__init__(rdclass, rdtype)
self.algorithm = algorithm
self.flags = flags
self.iterations = iterations
if isinstance(salt, text_type):
self.salt = salt.encode()
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'algorithm', algorithm)
object.__setattr__(self, 'flags', flags)
object.__setattr__(self, 'iterations', iterations)
if isinstance(salt, str):
object.__setattr__(self, 'salt', salt.encode())
else:
self.salt = salt
self.next = next
self.windows = windows
object.__setattr__(self, 'salt', salt)
object.__setattr__(self, 'next', next)
object.__setattr__(self, 'windows', dns.rdata._constify(windows))
def to_text(self, origin=None, relativize=True, **kw):
next = base64.b32encode(self.next).translate(
@ -80,70 +67,29 @@ class NSEC3(dns.rdata.Rdata):
salt = '-'
else:
salt = binascii.hexlify(self.salt).decode()
text = u''
for (window, bitmap) in self.windows:
bits = []
for i in xrange(0, len(bitmap)):
byte = bitmap[i]
for j in xrange(0, 8):
if byte & (0x80 >> j):
bits.append(dns.rdatatype.to_text(window * 256 +
i * 8 + j))
text += (u' ' + u' '.join(bits))
return u'%u %u %u %s %s%s' % (self.algorithm, self.flags,
self.iterations, salt, next, text)
text = Bitmap(self.windows).to_text()
return '%u %u %u %s %s%s' % (self.algorithm, self.flags,
self.iterations, salt, next, text)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
algorithm = tok.get_uint8()
flags = tok.get_uint8()
iterations = tok.get_uint16()
salt = tok.get_string()
if salt == u'-':
if salt == '-':
salt = b''
else:
salt = binascii.unhexlify(salt.encode('ascii'))
next = tok.get_string().encode(
'ascii').upper().translate(b32_hex_to_normal)
next = base64.b32decode(next)
rdtypes = []
while 1:
token = tok.get().unescape()
if token.is_eol_or_eof():
break
nrdtype = dns.rdatatype.from_text(token.value)
if nrdtype == 0:
raise dns.exception.SyntaxError("NSEC3 with bit 0")
if nrdtype > 65535:
raise dns.exception.SyntaxError("NSEC3 with bit > 65535")
rdtypes.append(nrdtype)
rdtypes.sort()
window = 0
octets = 0
prior_rdtype = 0
bitmap = bytearray(b'\0' * 32)
windows = []
for nrdtype in rdtypes:
if nrdtype == prior_rdtype:
continue
prior_rdtype = nrdtype
new_window = nrdtype // 256
if new_window != window:
if octets != 0:
windows.append((window, ''.join(bitmap[0:octets])))
bitmap = bytearray(b'\0' * 32)
window = new_window
offset = nrdtype % 256
byte = offset // 8
bit = offset % 8
octets = byte + 1
bitmap[byte] = bitmap[byte] | (0x80 >> bit)
if octets != 0:
windows.append((window, bitmap[0:octets]))
windows = Bitmap().from_text(tok)
return cls(rdclass, rdtype, algorithm, flags, iterations, salt, next,
windows)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
l = len(self.salt)
file.write(struct.pack("!BBHB", self.algorithm, self.flags,
self.iterations, l))
@ -151,42 +97,13 @@ class NSEC3(dns.rdata.Rdata):
l = len(self.next)
file.write(struct.pack("!B", l))
file.write(self.next)
for (window, bitmap) in self.windows:
file.write(struct.pack("!BB", window, len(bitmap)))
file.write(bitmap)
Bitmap(self.windows).to_wire(file)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
(algorithm, flags, iterations, slen) = \
struct.unpack('!BBHB', wire[current: current + 5])
current += 5
rdlen -= 5
salt = wire[current: current + slen].unwrap()
current += slen
rdlen -= slen
nlen = wire[current]
current += 1
rdlen -= 1
next = wire[current: current + nlen].unwrap()
current += nlen
rdlen -= nlen
windows = []
while rdlen > 0:
if rdlen < 3:
raise dns.exception.FormError("NSEC3 too short")
window = wire[current]
octets = wire[current + 1]
if octets == 0 or octets > 32:
raise dns.exception.FormError("bad NSEC3 octets")
current += 2
rdlen -= 2
if rdlen < octets:
raise dns.exception.FormError("bad NSEC3 bitmap length")
bitmap = bytearray(wire[current: current + octets].unwrap())
current += octets
rdlen -= octets
windows.append((window, bitmap))
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
(algorithm, flags, iterations) = parser.get_struct('!BBH')
salt = parser.get_counted_bytes()
next = parser.get_counted_bytes()
windows = Bitmap().from_wire_parser(parser)
return cls(rdclass, rdtype, algorithm, flags, iterations, salt, next,
windows)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2004-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -18,33 +20,23 @@ import binascii
import dns.exception
import dns.rdata
from dns._compat import text_type
class NSEC3PARAM(dns.rdata.Rdata):
"""NSEC3PARAM record
@ivar algorithm: the hash algorithm number
@type algorithm: int
@ivar flags: the flags
@type flags: int
@ivar iterations: the number of iterations
@type iterations: int
@ivar salt: the salt
@type salt: string"""
"""NSEC3PARAM record"""
__slots__ = ['algorithm', 'flags', 'iterations', 'salt']
def __init__(self, rdclass, rdtype, algorithm, flags, iterations, salt):
super(NSEC3PARAM, self).__init__(rdclass, rdtype)
self.algorithm = algorithm
self.flags = flags
self.iterations = iterations
if isinstance(salt, text_type):
self.salt = salt.encode()
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'algorithm', algorithm)
object.__setattr__(self, 'flags', flags)
object.__setattr__(self, 'iterations', iterations)
if isinstance(salt, str):
object.__setattr__(self, 'salt', salt.encode())
else:
self.salt = salt
object.__setattr__(self, 'salt', salt)
def to_text(self, origin=None, relativize=True, **kw):
if self.salt == b'':
@ -55,7 +47,8 @@ class NSEC3PARAM(dns.rdata.Rdata):
salt)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
algorithm = tok.get_uint8()
flags = tok.get_uint8()
iterations = tok.get_uint16()
@ -67,23 +60,14 @@ class NSEC3PARAM(dns.rdata.Rdata):
tok.get_eol()
return cls(rdclass, rdtype, algorithm, flags, iterations, salt)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
l = len(self.salt)
file.write(struct.pack("!BBHB", self.algorithm, self.flags,
self.iterations, l))
file.write(self.salt)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
(algorithm, flags, iterations, slen) = \
struct.unpack('!BBHB',
wire[current: current + 5])
current += 5
rdlen -= 5
salt = wire[current: current + slen].unwrap()
current += slen
rdlen -= slen
if rdlen != 0:
raise dns.exception.FormError
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
(algorithm, flags, iterations) = parser.get_struct('!BBH')
salt = parser.get_counted_bytes()
return cls(rdclass, rdtype, algorithm, flags, iterations, salt)

View file

@ -0,0 +1,50 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2016 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
# provided that the above copyright notice and this permission notice
# appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import base64
import dns.exception
import dns.rdata
import dns.tokenizer
class OPENPGPKEY(dns.rdata.Rdata):
"""OPENPGPKEY record"""
# see: RFC 7929
def __init__(self, rdclass, rdtype, key):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'key', key)
def to_text(self, origin=None, relativize=True, **kw):
return dns.rdata._base64ify(self.key)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
b64 = tok.concatenate_remaining_identifiers().encode()
key = base64.b64decode(b64)
return cls(rdclass, rdtype, key)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
file.write(self.key)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
key = parser.get_remaining()
return cls(rdclass, rdtype, key)

View file

@ -0,0 +1,67 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2001-2017 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
# provided that the above copyright notice and this permission notice
# appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import struct
import dns.edns
import dns.exception
import dns.rdata
class OPT(dns.rdata.Rdata):
"""OPT record"""
__slots__ = ['options']
def __init__(self, rdclass, rdtype, options):
"""Initialize an OPT rdata.
*rdclass*, an ``int`` is the rdataclass of the Rdata,
which is also the payload size.
*rdtype*, an ``int`` is the rdatatype of the Rdata.
*options*, a tuple of ``bytes``
"""
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'options', dns.rdata._constify(options))
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
for opt in self.options:
owire = opt.to_wire()
file.write(struct.pack("!HH", opt.otype, len(owire)))
file.write(owire)
def to_text(self, origin=None, relativize=True, **kw):
return ' '.join(opt.to_text() for opt in self.options)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
options = []
while parser.remaining() > 0:
(otype, olen) = parser.get_struct('!HH')
with parser.restrict_to(olen):
opt = dns.edns.option_from_wire_parser(otype, parser)
options.append(opt)
return cls(rdclass, rdtype, options)
@property
def payload(self):
"payload size"
return self.rdclass

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -20,61 +22,36 @@ import dns.name
class RP(dns.rdata.Rdata):
"""RP record
"""RP record"""
@ivar mbox: The responsible person's mailbox
@type mbox: dns.name.Name object
@ivar txt: The owner name of a node with TXT records, or the root name
if no TXT records are associated with this RP.
@type txt: dns.name.Name object
@see: RFC 1183"""
# see: RFC 1183
__slots__ = ['mbox', 'txt']
def __init__(self, rdclass, rdtype, mbox, txt):
super(RP, self).__init__(rdclass, rdtype)
self.mbox = mbox
self.txt = txt
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'mbox', mbox)
object.__setattr__(self, 'txt', txt)
def to_text(self, origin=None, relativize=True, **kw):
mbox = self.mbox.choose_relativity(origin, relativize)
txt = self.txt.choose_relativity(origin, relativize)
return "%s %s" % (str(mbox), str(txt))
return "{} {}".format(str(mbox), str(txt))
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
mbox = tok.get_name()
txt = tok.get_name()
mbox = mbox.choose_relativity(origin, relativize)
txt = txt.choose_relativity(origin, relativize)
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
mbox = tok.get_name(origin, relativize, relativize_to)
txt = tok.get_name(origin, relativize, relativize_to)
tok.get_eol()
return cls(rdclass, rdtype, mbox, txt)
def to_wire(self, file, compress=None, origin=None):
self.mbox.to_wire(file, None, origin)
self.txt.to_wire(file, None, origin)
def to_digestable(self, origin=None):
return self.mbox.to_digestable(origin) + \
self.txt.to_digestable(origin)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
self.mbox.to_wire(file, None, origin, canonicalize)
self.txt.to_wire(file, None, origin, canonicalize)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
(mbox, cused) = dns.name.from_wire(wire[: current + rdlen],
current)
current += cused
rdlen -= cused
if rdlen <= 0:
raise dns.exception.FormError
(txt, cused) = dns.name.from_wire(wire[: current + rdlen],
current)
if cused != rdlen:
raise dns.exception.FormError
if origin is not None:
mbox = mbox.relativize(origin)
txt = txt.relativize(origin)
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
mbox = parser.get_name(origin)
txt = parser.get_name(origin)
return cls(rdclass, rdtype, mbox, txt)
def choose_relativity(self, origin=None, relativize=True):
self.mbox = self.mbox.choose_relativity(origin, relativize)
self.txt = self.txt.choose_relativity(origin, relativize)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2004-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -30,6 +32,8 @@ class BadSigTime(dns.exception.DNSException):
def sigtime_to_posixtime(what):
if len(what) <= 10 and what.isdigit():
return int(what)
if len(what) != 14:
raise BadSigTime
year = int(what[0:4])
@ -48,26 +52,7 @@ def posixtime_to_sigtime(what):
class RRSIG(dns.rdata.Rdata):
"""RRSIG record
@ivar type_covered: the rdata type this signature covers
@type type_covered: int
@ivar algorithm: the algorithm used for the sig
@type algorithm: int
@ivar labels: number of labels
@type labels: int
@ivar original_ttl: the original TTL
@type original_ttl: long
@ivar expiration: signature expiration time
@type expiration: long
@ivar inception: signature inception time
@type inception: long
@ivar key_tag: the key tag
@type key_tag: int
@ivar signer: the signer
@type signer: dns.name.Name object
@ivar signature: the signature
@type signature: string"""
"""RRSIG record"""
__slots__ = ['type_covered', 'algorithm', 'labels', 'original_ttl',
'expiration', 'inception', 'key_tag', 'signer',
@ -76,16 +61,16 @@ class RRSIG(dns.rdata.Rdata):
def __init__(self, rdclass, rdtype, type_covered, algorithm, labels,
original_ttl, expiration, inception, key_tag, signer,
signature):
super(RRSIG, self).__init__(rdclass, rdtype)
self.type_covered = type_covered
self.algorithm = algorithm
self.labels = labels
self.original_ttl = original_ttl
self.expiration = expiration
self.inception = inception
self.key_tag = key_tag
self.signer = signer
self.signature = signature
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'type_covered', type_covered)
object.__setattr__(self, 'algorithm', algorithm)
object.__setattr__(self, 'labels', labels)
object.__setattr__(self, 'original_ttl', original_ttl)
object.__setattr__(self, 'expiration', expiration)
object.__setattr__(self, 'inception', inception)
object.__setattr__(self, 'key_tag', key_tag)
object.__setattr__(self, 'signer', signer)
object.__setattr__(self, 'signature', signature)
def covers(self):
return self.type_covered
@ -104,7 +89,8 @@ class RRSIG(dns.rdata.Rdata):
)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
type_covered = dns.rdatatype.from_text(tok.get_string())
algorithm = dns.dnssec.algorithm_from_text(tok.get_string())
labels = tok.get_int()
@ -112,45 +98,25 @@ class RRSIG(dns.rdata.Rdata):
expiration = sigtime_to_posixtime(tok.get_string())
inception = sigtime_to_posixtime(tok.get_string())
key_tag = tok.get_int()
signer = tok.get_name()
signer = signer.choose_relativity(origin, relativize)
chunks = []
while 1:
t = tok.get().unescape()
if t.is_eol_or_eof():
break
if not t.is_identifier():
raise dns.exception.SyntaxError
chunks.append(t.value.encode())
b64 = b''.join(chunks)
signer = tok.get_name(origin, relativize, relativize_to)
b64 = tok.concatenate_remaining_identifiers().encode()
signature = base64.b64decode(b64)
return cls(rdclass, rdtype, type_covered, algorithm, labels,
original_ttl, expiration, inception, key_tag, signer,
signature)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
header = struct.pack('!HBBIIIH', self.type_covered,
self.algorithm, self.labels,
self.original_ttl, self.expiration,
self.inception, self.key_tag)
file.write(header)
self.signer.to_wire(file, None, origin)
self.signer.to_wire(file, None, origin, canonicalize)
file.write(self.signature)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
header = struct.unpack('!HBBIIIH', wire[current: current + 18])
current += 18
rdlen -= 18
(signer, cused) = dns.name.from_wire(wire[: current + rdlen], current)
current += cused
rdlen -= cused
if origin is not None:
signer = signer.relativize(origin)
signature = wire[current: current + rdlen].unwrap()
return cls(rdclass, rdtype, header[0], header[1], header[2],
header[3], header[4], header[5], header[6], signer,
signature)
def choose_relativity(self, origin=None, relativize=True):
self.signer = self.signer.choose_relativity(origin, relativize)
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
header = parser.get_struct('!HBBIIIH')
signer = parser.get_name(origin)
signature = parser.get_remaining()
return cls(rdclass, rdtype, *header, signer, signature)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -22,38 +24,23 @@ import dns.name
class SOA(dns.rdata.Rdata):
"""SOA record
"""SOA record"""
@ivar mname: the SOA MNAME (master name) field
@type mname: dns.name.Name object
@ivar rname: the SOA RNAME (responsible name) field
@type rname: dns.name.Name object
@ivar serial: The zone's serial number
@type serial: int
@ivar refresh: The zone's refresh value (in seconds)
@type refresh: int
@ivar retry: The zone's retry value (in seconds)
@type retry: int
@ivar expire: The zone's expiration value (in seconds)
@type expire: int
@ivar minimum: The zone's negative caching time (in seconds, called
"minimum" for historical reasons)
@type minimum: int
@see: RFC 1035"""
# see: RFC 1035
__slots__ = ['mname', 'rname', 'serial', 'refresh', 'retry', 'expire',
'minimum']
def __init__(self, rdclass, rdtype, mname, rname, serial, refresh, retry,
expire, minimum):
super(SOA, self).__init__(rdclass, rdtype)
self.mname = mname
self.rname = rname
self.serial = serial
self.refresh = refresh
self.retry = retry
self.expire = expire
self.minimum = minimum
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'mname', mname)
object.__setattr__(self, 'rname', rname)
object.__setattr__(self, 'serial', serial)
object.__setattr__(self, 'refresh', refresh)
object.__setattr__(self, 'retry', retry)
object.__setattr__(self, 'expire', expire)
object.__setattr__(self, 'minimum', minimum)
def to_text(self, origin=None, relativize=True, **kw):
mname = self.mname.choose_relativity(origin, relativize)
@ -63,11 +50,10 @@ class SOA(dns.rdata.Rdata):
self.expire, self.minimum)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
mname = tok.get_name()
rname = tok.get_name()
mname = mname.choose_relativity(origin, relativize)
rname = rname.choose_relativity(origin, relativize)
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
mname = tok.get_name(origin, relativize, relativize_to)
rname = tok.get_name(origin, relativize, relativize_to)
serial = tok.get_uint32()
refresh = tok.get_ttl()
retry = tok.get_ttl()
@ -77,38 +63,15 @@ class SOA(dns.rdata.Rdata):
return cls(rdclass, rdtype, mname, rname, serial, refresh, retry,
expire, minimum)
def to_wire(self, file, compress=None, origin=None):
self.mname.to_wire(file, compress, origin)
self.rname.to_wire(file, compress, origin)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
self.mname.to_wire(file, compress, origin, canonicalize)
self.rname.to_wire(file, compress, origin, canonicalize)
five_ints = struct.pack('!IIIII', self.serial, self.refresh,
self.retry, self.expire, self.minimum)
file.write(five_ints)
def to_digestable(self, origin=None):
return self.mname.to_digestable(origin) + \
self.rname.to_digestable(origin) + \
struct.pack('!IIIII', self.serial, self.refresh,
self.retry, self.expire, self.minimum)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
(mname, cused) = dns.name.from_wire(wire[: current + rdlen], current)
current += cused
rdlen -= cused
(rname, cused) = dns.name.from_wire(wire[: current + rdlen], current)
current += cused
rdlen -= cused
if rdlen != 20:
raise dns.exception.FormError
five_ints = struct.unpack('!IIIII',
wire[current: current + rdlen])
if origin is not None:
mname = mname.relativize(origin)
rname = rname.relativize(origin)
return cls(rdclass, rdtype, mname, rname,
five_ints[0], five_ints[1], five_ints[2], five_ints[3],
five_ints[4])
def choose_relativity(self, origin=None, relativize=True):
self.mname = self.mname.choose_relativity(origin, relativize)
self.rname = self.rname.choose_relativity(origin, relativize)
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
mname = parser.get_name(origin)
rname = parser.get_name(origin)
return cls(rdclass, rdtype, mname, rname, *parser.get_struct('!IIIII'))

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2006, 2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -18,6 +20,6 @@ import dns.rdtypes.txtbase
class SPF(dns.rdtypes.txtbase.TXTBase):
"""SPF record
"""SPF record"""
@see: RFC 4408"""
# see: RFC 4408

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2005-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -22,24 +24,18 @@ import dns.rdatatype
class SSHFP(dns.rdata.Rdata):
"""SSHFP record
"""SSHFP record"""
@ivar algorithm: the algorithm
@type algorithm: int
@ivar fp_type: the digest type
@type fp_type: int
@ivar fingerprint: the fingerprint
@type fingerprint: string
@see: draft-ietf-secsh-dns-05.txt"""
# See RFC 4255
__slots__ = ['algorithm', 'fp_type', 'fingerprint']
def __init__(self, rdclass, rdtype, algorithm, fp_type,
fingerprint):
super(SSHFP, self).__init__(rdclass, rdtype)
self.algorithm = algorithm
self.fp_type = fp_type
self.fingerprint = fingerprint
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'algorithm', algorithm)
object.__setattr__(self, 'fp_type', fp_type)
object.__setattr__(self, 'fingerprint', fingerprint)
def to_text(self, origin=None, relativize=True, **kw):
return '%d %d %s' % (self.algorithm,
@ -48,31 +44,21 @@ class SSHFP(dns.rdata.Rdata):
chunksize=128))
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
algorithm = tok.get_uint8()
fp_type = tok.get_uint8()
chunks = []
while 1:
t = tok.get().unescape()
if t.is_eol_or_eof():
break
if not t.is_identifier():
raise dns.exception.SyntaxError
chunks.append(t.value.encode())
fingerprint = b''.join(chunks)
fingerprint = tok.concatenate_remaining_identifiers().encode()
fingerprint = binascii.unhexlify(fingerprint)
return cls(rdclass, rdtype, algorithm, fp_type, fingerprint)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
header = struct.pack("!BB", self.algorithm, self.fp_type)
file.write(header)
file.write(self.fingerprint)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
header = struct.unpack("!BB", wire[current: current + 2])
current += 2
rdlen -= 2
fingerprint = wire[current: current + rdlen].unwrap()
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
header = parser.get_struct("BB")
fingerprint = parser.get_remaining()
return cls(rdclass, rdtype, header[0], header[1], fingerprint)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2005-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -22,27 +24,19 @@ import dns.rdatatype
class TLSA(dns.rdata.Rdata):
"""TLSA record
"""TLSA record"""
@ivar usage: The certificate usage
@type usage: int
@ivar selector: The selector field
@type selector: int
@ivar mtype: The 'matching type' field
@type mtype: int
@ivar cert: The 'Certificate Association Data' field
@type cert: string
@see: RFC 6698"""
# see: RFC 6698
__slots__ = ['usage', 'selector', 'mtype', 'cert']
def __init__(self, rdclass, rdtype, usage, selector,
mtype, cert):
super(TLSA, self).__init__(rdclass, rdtype)
self.usage = usage
self.selector = selector
self.mtype = mtype
self.cert = cert
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'usage', usage)
object.__setattr__(self, 'selector', selector)
object.__setattr__(self, 'mtype', mtype)
object.__setattr__(self, 'cert', cert)
def to_text(self, origin=None, relativize=True, **kw):
return '%d %d %d %s' % (self.usage,
@ -52,32 +46,22 @@ class TLSA(dns.rdata.Rdata):
chunksize=128))
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
usage = tok.get_uint8()
selector = tok.get_uint8()
mtype = tok.get_uint8()
cert_chunks = []
while 1:
t = tok.get().unescape()
if t.is_eol_or_eof():
break
if not t.is_identifier():
raise dns.exception.SyntaxError
cert_chunks.append(t.value.encode())
cert = b''.join(cert_chunks)
cert = tok.concatenate_remaining_identifiers().encode()
cert = binascii.unhexlify(cert)
return cls(rdclass, rdtype, usage, selector, mtype, cert)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
header = struct.pack("!BBB", self.usage, self.selector, self.mtype)
file.write(header)
file.write(self.cert)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
header = struct.unpack("!BBB", wire[current: current + 3])
current += 3
rdlen -= 3
cert = wire[current: current + rdlen].unwrap()
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
header = parser.get_struct("BBB")
cert = parser.get_remaining()
return cls(rdclass, rdtype, header[0], header[1], header[2], cert)

View file

@ -0,0 +1,91 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2001-2017 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
# provided that the above copyright notice and this permission notice
# appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import struct
import dns.exception
import dns.rdata
class TSIG(dns.rdata.Rdata):
"""TSIG record"""
__slots__ = ['algorithm', 'time_signed', 'fudge', 'mac',
'original_id', 'error', 'other']
def __init__(self, rdclass, rdtype, algorithm, time_signed, fudge, mac,
original_id, error, other):
"""Initialize a TSIG rdata.
*rdclass*, an ``int`` is the rdataclass of the Rdata.
*rdtype*, an ``int`` is the rdatatype of the Rdata.
*algorithm*, a ``dns.name.Name``.
*time_signed*, an ``int``.
*fudge*, an ``int`.
*mac*, a ``bytes``
*original_id*, an ``int``
*error*, an ``int``
*other*, a ``bytes``
"""
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'algorithm', algorithm)
object.__setattr__(self, 'time_signed', time_signed)
object.__setattr__(self, 'fudge', fudge)
object.__setattr__(self, 'mac', dns.rdata._constify(mac))
object.__setattr__(self, 'original_id', original_id)
object.__setattr__(self, 'error', error)
object.__setattr__(self, 'other', dns.rdata._constify(other))
def to_text(self, origin=None, relativize=True, **kw):
algorithm = self.algorithm.choose_relativity(origin, relativize)
return f"{algorithm} {self.fudge} {self.time_signed} " + \
f"{len(self.mac)} {dns.rdata._base64ify(self.mac, 0)} " + \
f"{self.original_id} {self.error} " + \
f"{len(self.other)} {dns.rdata._base64ify(self.other, 0)}"
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
self.algorithm.to_wire(file, None, origin, False)
file.write(struct.pack('!HIHH',
(self.time_signed >> 32) & 0xffff,
self.time_signed & 0xffffffff,
self.fudge,
len(self.mac)))
file.write(self.mac)
file.write(struct.pack('!HHH', self.original_id, self.error,
len(self.other)))
file.write(self.other)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
algorithm = parser.get_name(origin)
(time_hi, time_lo, fudge) = parser.get_struct('!HIH')
time_signed = (time_hi << 32) + time_lo
mac = parser.get_counted_bytes(2)
(original_id, error) = parser.get_struct('!HH')
other = parser.get_counted_bytes(2)
return cls(rdclass, rdtype, algorithm, time_signed, fudge, mac,
original_id, error, other)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
# Copyright (C) 2015 Red Hat, Inc.
#
@ -19,40 +21,34 @@ import struct
import dns.exception
import dns.rdata
import dns.name
from dns._compat import text_type
class URI(dns.rdata.Rdata):
"""URI record
"""URI record"""
@ivar priority: the priority
@type priority: int
@ivar weight: the weight
@type weight: int
@ivar target: the target host
@type target: dns.name.Name object
@see: draft-faltstrom-uri-13"""
# see RFC 7553
__slots__ = ['priority', 'weight', 'target']
def __init__(self, rdclass, rdtype, priority, weight, target):
super(URI, self).__init__(rdclass, rdtype)
self.priority = priority
self.weight = weight
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'priority', priority)
object.__setattr__(self, 'weight', weight)
if len(target) < 1:
raise dns.exception.SyntaxError("URI target cannot be empty")
if isinstance(target, text_type):
self.target = target.encode()
if isinstance(target, str):
object.__setattr__(self, 'target', target.encode())
else:
self.target = target
object.__setattr__(self, 'target', target)
def to_text(self, origin=None, relativize=True, **kw):
return '%d %d "%s"' % (self.priority, self.weight,
self.target.decode())
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
priority = tok.get_uint16()
weight = tok.get_uint16()
target = tok.get().unescape()
@ -61,21 +57,15 @@ class URI(dns.rdata.Rdata):
tok.get_eol()
return cls(rdclass, rdtype, priority, weight, target.value)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
two_ints = struct.pack("!HH", self.priority, self.weight)
file.write(two_ints)
file.write(self.target)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
if rdlen < 5:
raise dns.exception.FormError('URI RR is shorter than 5 octets')
(priority, weight) = struct.unpack('!HH', wire[current: current + 4])
current += 4
rdlen -= 4
target = wire[current: current + rdlen]
current += rdlen
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
(priority, weight) = parser.get_struct('!HH')
target = parser.get_remaining()
if len(target) == 0:
raise dns.exception.FormError('URI target may not be empty')
return cls(rdclass, rdtype, priority, weight, target)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -18,48 +20,40 @@ import struct
import dns.exception
import dns.rdata
import dns.tokenizer
from dns._compat import text_type
class X25(dns.rdata.Rdata):
"""X25 record
"""X25 record"""
@ivar address: the PSDN address
@type address: string
@see: RFC 1183"""
# see RFC 1183
__slots__ = ['address']
def __init__(self, rdclass, rdtype, address):
super(X25, self).__init__(rdclass, rdtype)
if isinstance(address, text_type):
self.address = address.encode()
super().__init__(rdclass, rdtype)
if isinstance(address, str):
object.__setattr__(self, 'address', address.encode())
else:
self.address = address
object.__setattr__(self, 'address', address)
def to_text(self, origin=None, relativize=True, **kw):
return '"%s"' % dns.rdata._escapify(self.address)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
address = tok.get_string()
tok.get_eol()
return cls(rdclass, rdtype, address)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
l = len(self.address)
assert l < 256
file.write(struct.pack('!B', l))
file.write(self.address)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
l = wire[current]
current += 1
rdlen -= 1
if l != rdlen:
raise dns.exception.FormError
address = wire[current: current + l].unwrap()
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
address = parser.get_counted_bytes()
return cls(rdclass, rdtype, address)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -17,10 +19,13 @@
__all__ = [
'AFSDB',
'AVC',
'CAA',
'CDNSKEY',
'CDS',
'CERT',
'CNAME',
'CSYNC',
'DLV',
'DNAME',
'DNSKEY',
@ -37,7 +42,8 @@ __all__ = [
'NSEC',
'NSEC3',
'NSEC3PARAM',
'TLSA',
'OPENPGPKEY',
'OPT',
'PTR',
'RP',
'RRSIG',
@ -45,6 +51,9 @@ __all__ = [
'SOA',
'SPF',
'SSHFP',
'TLSA',
'TSIG',
'TXT',
'URI',
'X25',
]

56
lib/dns/rdtypes/CH/A.py Normal file
View file

@ -0,0 +1,56 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
# provided that the above copyright notice and this permission notice
# appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.rdtypes.mxbase
import struct
class A(dns.rdata.Rdata):
"""A record for Chaosnet"""
# domain: the domain of the address
# address: the 16-bit address
__slots__ = ['domain', 'address']
def __init__(self, rdclass, rdtype, domain, address):
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'domain', domain)
object.__setattr__(self, 'address', address)
def to_text(self, origin=None, relativize=True, **kw):
domain = self.domain.choose_relativity(origin, relativize)
return '%s %o' % (domain, self.address)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
domain = tok.get_name(origin, relativize, relativize_to)
address = tok.get_uint16(base=8)
tok.get_eol()
return cls(rdclass, rdtype, domain, address)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
self.domain.to_wire(file, compress, origin, canonicalize)
pref = struct.pack("!H", self.address)
file.write(pref)
@classmethod
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
domain = parser.get_name(origin)
address = parser.get_uint16()
return cls(rdclass, rdtype, domain, address)

View file

@ -0,0 +1,22 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
# provided that the above copyright notice and this permission notice
# appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""Class CH rdata type classes."""
__all__ = [
'A',
]

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -21,33 +23,30 @@ import dns.tokenizer
class A(dns.rdata.Rdata):
"""A record.
@ivar address: an IPv4 address
@type address: string (in the standard "dotted quad" format)"""
"""A record."""
__slots__ = ['address']
def __init__(self, rdclass, rdtype, address):
super(A, self).__init__(rdclass, rdtype)
super().__init__(rdclass, rdtype)
# check that it's OK
dns.ipv4.inet_aton(address)
self.address = address
object.__setattr__(self, 'address', address)
def to_text(self, origin=None, relativize=True, **kw):
return self.address
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
address = tok.get_identifier()
tok.get_eol()
return cls(rdclass, rdtype, address)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
file.write(dns.ipv4.inet_aton(self.address))
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
address = dns.ipv4.inet_ntoa(wire[current: current + rdlen]).decode()
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
address = dns.ipv4.inet_ntoa(parser.get_remaining())
return cls(rdclass, rdtype, address)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -14,41 +16,37 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import dns.exception
import dns.inet
import dns.ipv6
import dns.rdata
import dns.tokenizer
class AAAA(dns.rdata.Rdata):
"""AAAA record.
@ivar address: an IPv6 address
@type address: string (in the standard IPv6 format)"""
"""AAAA record."""
__slots__ = ['address']
def __init__(self, rdclass, rdtype, address):
super(AAAA, self).__init__(rdclass, rdtype)
super().__init__(rdclass, rdtype)
# check that it's OK
dns.inet.inet_pton(dns.inet.AF_INET6, address)
self.address = address
dns.ipv6.inet_aton(address)
object.__setattr__(self, 'address', address)
def to_text(self, origin=None, relativize=True, **kw):
return self.address
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
address = tok.get_identifier()
tok.get_eol()
return cls(rdclass, rdtype, address)
def to_wire(self, file, compress=None, origin=None):
file.write(dns.inet.inet_pton(dns.inet.AF_INET6, self.address))
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
file.write(dns.ipv6.inet_aton(self.address))
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
address = dns.inet.inet_ntop(dns.inet.AF_INET6,
wire[current: current + rdlen])
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
address = dns.ipv6.inet_ntoa(parser.get_remaining())
return cls(rdclass, rdtype, address)

View file

@ -1,4 +1,6 @@
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2017 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
@ -13,29 +15,19 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import struct
import binascii
import codecs
import struct
import dns.exception
import dns.inet
import dns.ipv4
import dns.ipv6
import dns.rdata
import dns.tokenizer
from dns._compat import xrange
class APLItem:
class APLItem(object):
"""An APL list item.
@ivar family: the address family (IANA address family registry)
@type family: int
@ivar negation: is this item negated?
@type negation: bool
@ivar address: the address
@type address: string
@ivar prefix: the prefix length
@type prefix: int
"""
"""An APL list item."""
__slots__ = ['family', 'negation', 'address', 'prefix']
@ -53,17 +45,17 @@ class APLItem(object):
def to_wire(self, file):
if self.family == 1:
address = dns.inet.inet_pton(dns.inet.AF_INET, self.address)
address = dns.ipv4.inet_aton(self.address)
elif self.family == 2:
address = dns.inet.inet_pton(dns.inet.AF_INET6, self.address)
address = dns.ipv6.inet_aton(self.address)
else:
address = binascii.unhexlify(self.address)
#
# Truncate least significant zero bytes.
#
last = 0
for i in xrange(len(address) - 1, -1, -1):
if address[i] != chr(0):
for i in range(len(address) - 1, -1, -1):
if address[i] != 0:
last = i + 1
break
address = address[0: last]
@ -78,25 +70,24 @@ class APLItem(object):
class APL(dns.rdata.Rdata):
"""APL record.
"""APL record."""
@ivar items: a list of APL items
@type items: list of APL_Item
@see: RFC 3123"""
# see: RFC 3123
__slots__ = ['items']
def __init__(self, rdclass, rdtype, items):
super(APL, self).__init__(rdclass, rdtype)
self.items = items
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'items', dns.rdata._constify(items))
def to_text(self, origin=None, relativize=True, **kw):
return ' '.join(map(lambda x: str(x), self.items))
return ' '.join(map(str, self.items))
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
items = []
while 1:
while True:
token = tok.get().unescape()
if token.is_eol_or_eof():
break
@ -115,48 +106,38 @@ class APL(dns.rdata.Rdata):
return cls(rdclass, rdtype, items)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
for item in self.items:
item.to_wire(file)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
items = []
while 1:
if rdlen == 0:
break
if rdlen < 4:
raise dns.exception.FormError
header = struct.unpack('!HBB', wire[current: current + 4])
while parser.remaining() > 0:
header = parser.get_struct('!HBB')
afdlen = header[2]
if afdlen > 127:
negation = True
afdlen -= 128
else:
negation = False
current += 4
rdlen -= 4
if rdlen < afdlen:
raise dns.exception.FormError
address = wire[current: current + afdlen].unwrap()
address = parser.get_bytes(afdlen)
l = len(address)
if header[0] == 1:
if l < 4:
address += '\x00' * (4 - l)
address = dns.inet.inet_ntop(dns.inet.AF_INET, address)
address += b'\x00' * (4 - l)
address = dns.ipv4.inet_ntoa(address)
elif header[0] == 2:
if l < 16:
address += '\x00' * (16 - l)
address = dns.inet.inet_ntop(dns.inet.AF_INET6, address)
address += b'\x00' * (16 - l)
address = dns.ipv6.inet_ntoa(address)
else:
#
# This isn't really right according to the RFC, but it
# seems better than throwing an exception
#
address = address.encode('hex_codec')
current += afdlen
rdlen -= afdlen
address = codecs.encode(address, 'hex_codec')
item = APLItem(header[0], negation, address, header[1])
items.append(item)
return cls(rdclass, rdtype, items)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2006, 2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -20,41 +22,30 @@ import dns.exception
class DHCID(dns.rdata.Rdata):
"""DHCID record
"""DHCID record"""
@ivar data: the data (the content of the RR is opaque as far as the
DNS is concerned)
@type data: string
@see: RFC 4701"""
# see: RFC 4701
__slots__ = ['data']
def __init__(self, rdclass, rdtype, data):
super(DHCID, self).__init__(rdclass, rdtype)
self.data = data
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'data', data)
def to_text(self, origin=None, relativize=True, **kw):
return dns.rdata._base64ify(self.data)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
chunks = []
while 1:
t = tok.get().unescape()
if t.is_eol_or_eof():
break
if not t.is_identifier():
raise dns.exception.SyntaxError
chunks.append(t.value.encode())
b64 = b''.join(chunks)
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
b64 = tok.concatenate_remaining_identifiers().encode()
data = base64.b64decode(b64)
return cls(rdclass, rdtype, data)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
file.write(self.data)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
data = wire[current: current + rdlen].unwrap()
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
data = parser.get_remaining()
return cls(rdclass, rdtype, data)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2006, 2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -17,133 +19,63 @@ import struct
import base64
import dns.exception
import dns.inet
import dns.name
import dns.rdtypes.util
class Gateway(dns.rdtypes.util.Gateway):
name = 'IPSECKEY gateway'
class IPSECKEY(dns.rdata.Rdata):
"""IPSECKEY record
"""IPSECKEY record"""
@ivar precedence: the precedence for this key data
@type precedence: int
@ivar gateway_type: the gateway type
@type gateway_type: int
@ivar algorithm: the algorithm to use
@type algorithm: int
@ivar gateway: the public key
@type gateway: None, IPv4 address, IPV6 address, or domain name
@ivar key: the public key
@type key: string
@see: RFC 4025"""
# see: RFC 4025
__slots__ = ['precedence', 'gateway_type', 'algorithm', 'gateway', 'key']
def __init__(self, rdclass, rdtype, precedence, gateway_type, algorithm,
gateway, key):
super(IPSECKEY, self).__init__(rdclass, rdtype)
if gateway_type == 0:
if gateway != '.' and gateway is not None:
raise SyntaxError('invalid gateway for gateway type 0')
gateway = None
elif gateway_type == 1:
# check that it's OK
dns.inet.inet_pton(dns.inet.AF_INET, gateway)
elif gateway_type == 2:
# check that it's OK
dns.inet.inet_pton(dns.inet.AF_INET6, gateway)
elif gateway_type == 3:
pass
else:
raise SyntaxError(
'invalid IPSECKEY gateway type: %d' % gateway_type)
self.precedence = precedence
self.gateway_type = gateway_type
self.algorithm = algorithm
self.gateway = gateway
self.key = key
super().__init__(rdclass, rdtype)
Gateway(gateway_type, gateway).check()
object.__setattr__(self, 'precedence', precedence)
object.__setattr__(self, 'gateway_type', gateway_type)
object.__setattr__(self, 'algorithm', algorithm)
object.__setattr__(self, 'gateway', gateway)
object.__setattr__(self, 'key', key)
def to_text(self, origin=None, relativize=True, **kw):
if self.gateway_type == 0:
gateway = '.'
elif self.gateway_type == 1:
gateway = self.gateway
elif self.gateway_type == 2:
gateway = self.gateway
elif self.gateway_type == 3:
gateway = str(self.gateway.choose_relativity(origin, relativize))
else:
raise ValueError('invalid gateway type')
gateway = Gateway(self.gateway_type, self.gateway).to_text(origin,
relativize)
return '%d %d %d %s %s' % (self.precedence, self.gateway_type,
self.algorithm, gateway,
dns.rdata._base64ify(self.key))
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
precedence = tok.get_uint8()
gateway_type = tok.get_uint8()
algorithm = tok.get_uint8()
if gateway_type == 3:
gateway = tok.get_name().choose_relativity(origin, relativize)
else:
gateway = tok.get_string()
chunks = []
while 1:
t = tok.get().unescape()
if t.is_eol_or_eof():
break
if not t.is_identifier():
raise dns.exception.SyntaxError
chunks.append(t.value.encode())
b64 = b''.join(chunks)
gateway = Gateway(gateway_type).from_text(tok, origin, relativize,
relativize_to)
b64 = tok.concatenate_remaining_identifiers().encode()
key = base64.b64decode(b64)
return cls(rdclass, rdtype, precedence, gateway_type, algorithm,
gateway, key)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
header = struct.pack("!BBB", self.precedence, self.gateway_type,
self.algorithm)
file.write(header)
if self.gateway_type == 0:
pass
elif self.gateway_type == 1:
file.write(dns.inet.inet_pton(dns.inet.AF_INET, self.gateway))
elif self.gateway_type == 2:
file.write(dns.inet.inet_pton(dns.inet.AF_INET6, self.gateway))
elif self.gateway_type == 3:
self.gateway.to_wire(file, None, origin)
else:
raise ValueError('invalid gateway type')
Gateway(self.gateway_type, self.gateway).to_wire(file, compress,
origin, canonicalize)
file.write(self.key)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
if rdlen < 3:
raise dns.exception.FormError
header = struct.unpack('!BBB', wire[current: current + 3])
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
header = parser.get_struct('!BBB')
gateway_type = header[1]
current += 3
rdlen -= 3
if gateway_type == 0:
gateway = None
elif gateway_type == 1:
gateway = dns.inet.inet_ntop(dns.inet.AF_INET,
wire[current: current + 4])
current += 4
rdlen -= 4
elif gateway_type == 2:
gateway = dns.inet.inet_ntop(dns.inet.AF_INET6,
wire[current: current + 16])
current += 16
rdlen -= 16
elif gateway_type == 3:
(gateway, cused) = dns.name.from_wire(wire[: current + rdlen],
current)
current += cused
rdlen -= cused
else:
raise dns.exception.FormError('invalid IPSECKEY gateway type')
key = wire[current: current + rdlen].unwrap()
gateway = Gateway(gateway_type).from_wire_parser(parser, origin)
key = parser.get_remaining()
return cls(rdclass, rdtype, header[0], gateway_type, header[2],
gateway, key)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -16,6 +18,6 @@
import dns.rdtypes.mxbase
class KX(dns.rdtypes.mxbase.UncompressedMX):
class KX(dns.rdtypes.mxbase.UncompressedDowncasingMX):
"""KX record"""

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -18,7 +20,6 @@ import struct
import dns.exception
import dns.name
import dns.rdata
from dns._compat import xrange, text_type
def _write_string(file, s):
@ -29,41 +30,29 @@ def _write_string(file, s):
def _sanitize(value):
if isinstance(value, text_type):
if isinstance(value, str):
return value.encode()
return value
class NAPTR(dns.rdata.Rdata):
"""NAPTR record
"""NAPTR record"""
@ivar order: order
@type order: int
@ivar preference: preference
@type preference: int
@ivar flags: flags
@type flags: string
@ivar service: service
@type service: string
@ivar regexp: regular expression
@type regexp: string
@ivar replacement: replacement name
@type replacement: dns.name.Name object
@see: RFC 3403"""
# see: RFC 3403
__slots__ = ['order', 'preference', 'flags', 'service', 'regexp',
'replacement']
def __init__(self, rdclass, rdtype, order, preference, flags, service,
regexp, replacement):
super(NAPTR, self).__init__(rdclass, rdtype)
self.flags = _sanitize(flags)
self.service = _sanitize(service)
self.regexp = _sanitize(regexp)
self.order = order
self.preference = preference
self.replacement = replacement
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'flags', _sanitize(flags))
object.__setattr__(self, 'service', _sanitize(service))
object.__setattr__(self, 'regexp', _sanitize(regexp))
object.__setattr__(self, 'order', order)
object.__setattr__(self, 'preference', preference)
object.__setattr__(self, 'replacement', replacement)
def to_text(self, origin=None, relativize=True, **kw):
replacement = self.replacement.choose_relativity(origin, relativize)
@ -75,51 +64,33 @@ class NAPTR(dns.rdata.Rdata):
replacement)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
order = tok.get_uint16()
preference = tok.get_uint16()
flags = tok.get_string()
service = tok.get_string()
regexp = tok.get_string()
replacement = tok.get_name()
replacement = replacement.choose_relativity(origin, relativize)
replacement = tok.get_name(origin, relativize, relativize_to)
tok.get_eol()
return cls(rdclass, rdtype, order, preference, flags, service,
regexp, replacement)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
two_ints = struct.pack("!HH", self.order, self.preference)
file.write(two_ints)
_write_string(file, self.flags)
_write_string(file, self.service)
_write_string(file, self.regexp)
self.replacement.to_wire(file, compress, origin)
self.replacement.to_wire(file, compress, origin, canonicalize)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
(order, preference) = struct.unpack('!HH', wire[current: current + 4])
current += 4
rdlen -= 4
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
(order, preference) = parser.get_struct('!HH')
strings = []
for i in xrange(3):
l = wire[current]
current += 1
rdlen -= 1
if l > rdlen or rdlen < 0:
raise dns.exception.FormError
s = wire[current: current + l].unwrap()
current += l
rdlen -= l
for i in range(3):
s = parser.get_counted_bytes()
strings.append(s)
(replacement, cused) = dns.name.from_wire(wire[: current + rdlen],
current)
if cused != rdlen:
raise dns.exception.FormError
if origin is not None:
replacement = replacement.relativize(origin)
replacement = parser.get_name(origin)
return cls(rdclass, rdtype, order, preference, strings[0], strings[1],
strings[2], replacement)
def choose_relativity(self, origin=None, relativize=True):
self.replacement = self.replacement.choose_relativity(origin,
relativize)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -22,23 +24,22 @@ import dns.tokenizer
class NSAP(dns.rdata.Rdata):
"""NSAP record.
"""NSAP record."""
@ivar address: a NASP
@type address: string
@see: RFC 1706"""
# see: RFC 1706
__slots__ = ['address']
def __init__(self, rdclass, rdtype, address):
super(NSAP, self).__init__(rdclass, rdtype)
self.address = address
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'address', address)
def to_text(self, origin=None, relativize=True, **kw):
return "0x%s" % binascii.hexlify(self.address).decode()
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
address = tok.get_string()
tok.get_eol()
if address[0:2] != '0x':
@ -49,11 +50,10 @@ class NSAP(dns.rdata.Rdata):
address = binascii.unhexlify(address.encode())
return cls(rdclass, rdtype, address)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
file.write(self.address)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
address = wire[current: current + rdlen].unwrap()
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
address = parser.get_remaining()
return cls(rdclass, rdtype, address)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -22,23 +24,17 @@ import dns.name
class PX(dns.rdata.Rdata):
"""PX record.
"""PX record."""
@ivar preference: the preference value
@type preference: int
@ivar map822: the map822 name
@type map822: dns.name.Name object
@ivar mapx400: the mapx400 name
@type mapx400: dns.name.Name object
@see: RFC 2163"""
# see: RFC 2163
__slots__ = ['preference', 'map822', 'mapx400']
def __init__(self, rdclass, rdtype, preference, map822, mapx400):
super(PX, self).__init__(rdclass, rdtype)
self.preference = preference
self.map822 = map822
self.mapx400 = mapx400
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'preference', preference)
object.__setattr__(self, 'map822', map822)
object.__setattr__(self, 'mapx400', mapx400)
def to_text(self, origin=None, relativize=True, **kw):
map822 = self.map822.choose_relativity(origin, relativize)
@ -46,42 +42,23 @@ class PX(dns.rdata.Rdata):
return '%d %s %s' % (self.preference, map822, mapx400)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
preference = tok.get_uint16()
map822 = tok.get_name()
map822 = map822.choose_relativity(origin, relativize)
mapx400 = tok.get_name(None)
mapx400 = mapx400.choose_relativity(origin, relativize)
map822 = tok.get_name(origin, relativize, relativize_to)
mapx400 = tok.get_name(origin, relativize, relativize_to)
tok.get_eol()
return cls(rdclass, rdtype, preference, map822, mapx400)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
pref = struct.pack("!H", self.preference)
file.write(pref)
self.map822.to_wire(file, None, origin)
self.mapx400.to_wire(file, None, origin)
self.map822.to_wire(file, None, origin, canonicalize)
self.mapx400.to_wire(file, None, origin, canonicalize)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
(preference, ) = struct.unpack('!H', wire[current: current + 2])
current += 2
rdlen -= 2
(map822, cused) = dns.name.from_wire(wire[: current + rdlen],
current)
if cused > rdlen:
raise dns.exception.FormError
current += cused
rdlen -= cused
if origin is not None:
map822 = map822.relativize(origin)
(mapx400, cused) = dns.name.from_wire(wire[: current + rdlen],
current)
if cused != rdlen:
raise dns.exception.FormError
if origin is not None:
mapx400 = mapx400.relativize(origin)
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
preference = parser.get_uint16()
map822 = parser.get_name(origin)
mapx400 = parser.get_name(origin)
return cls(rdclass, rdtype, preference, map822, mapx400)
def choose_relativity(self, origin=None, relativize=True):
self.map822 = self.map822.choose_relativity(origin, relativize)
self.mapx400 = self.mapx400.choose_relativity(origin, relativize)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -22,26 +24,18 @@ import dns.name
class SRV(dns.rdata.Rdata):
"""SRV record
"""SRV record"""
@ivar priority: the priority
@type priority: int
@ivar weight: the weight
@type weight: int
@ivar port: the port of the service
@type port: int
@ivar target: the target host
@type target: dns.name.Name object
@see: RFC 2782"""
# see: RFC 2782
__slots__ = ['priority', 'weight', 'port', 'target']
def __init__(self, rdclass, rdtype, priority, weight, port, target):
super(SRV, self).__init__(rdclass, rdtype)
self.priority = priority
self.weight = weight
self.port = port
self.target = target
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'priority', priority)
object.__setattr__(self, 'weight', weight)
object.__setattr__(self, 'port', port)
object.__setattr__(self, 'target', target)
def to_text(self, origin=None, relativize=True, **kw):
target = self.target.choose_relativity(origin, relativize)
@ -49,33 +43,22 @@ class SRV(dns.rdata.Rdata):
target)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
priority = tok.get_uint16()
weight = tok.get_uint16()
port = tok.get_uint16()
target = tok.get_name(None)
target = target.choose_relativity(origin, relativize)
target = tok.get_name(origin, relativize, relativize_to)
tok.get_eol()
return cls(rdclass, rdtype, priority, weight, port, target)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
three_ints = struct.pack("!HHH", self.priority, self.weight, self.port)
file.write(three_ints)
self.target.to_wire(file, compress, origin)
self.target.to_wire(file, compress, origin, canonicalize)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
(priority, weight, port) = struct.unpack('!HHH',
wire[current: current + 6])
current += 6
rdlen -= 6
(target, cused) = dns.name.from_wire(wire[: current + rdlen],
current)
if cused != rdlen:
raise dns.exception.FormError
if origin is not None:
target = target.relativize(origin)
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
(priority, weight, port) = parser.get_struct('!HHH')
target = parser.get_name(origin)
return cls(rdclass, rdtype, priority, weight, port, target)
def choose_relativity(self, origin=None, relativize=True):
self.target = self.target.choose_relativity(origin, relativize)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -18,7 +20,6 @@ import struct
import dns.ipv4
import dns.rdata
from dns._compat import xrange
_proto_tcp = socket.getprotobyname('tcp')
_proto_udp = socket.getprotobyname('udp')
@ -26,39 +27,31 @@ _proto_udp = socket.getprotobyname('udp')
class WKS(dns.rdata.Rdata):
"""WKS record
"""WKS record"""
@ivar address: the address
@type address: string
@ivar protocol: the protocol
@type protocol: int
@ivar bitmap: the bitmap
@type bitmap: string
@see: RFC 1035"""
# see: RFC 1035
__slots__ = ['address', 'protocol', 'bitmap']
def __init__(self, rdclass, rdtype, address, protocol, bitmap):
super(WKS, self).__init__(rdclass, rdtype)
self.address = address
self.protocol = protocol
if not isinstance(bitmap, bytearray):
self.bitmap = bytearray(bitmap)
else:
self.bitmap = bitmap
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'address', address)
object.__setattr__(self, 'protocol', protocol)
object.__setattr__(self, 'bitmap', dns.rdata._constify(bitmap))
def to_text(self, origin=None, relativize=True, **kw):
bits = []
for i in xrange(0, len(self.bitmap)):
for i in range(0, len(self.bitmap)):
byte = self.bitmap[i]
for j in xrange(0, 8):
for j in range(0, 8):
if byte & (0x80 >> j):
bits.append(str(i * 8 + j))
text = ' '.join(bits)
return '%s %d %s' % (self.address, self.protocol, text)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
address = tok.get_string()
protocol = tok.get_string()
if protocol.isdigit():
@ -83,24 +76,21 @@ class WKS(dns.rdata.Rdata):
i = serv // 8
l = len(bitmap)
if l < i + 1:
for j in xrange(l, i + 1):
for j in range(l, i + 1):
bitmap.append(0)
bitmap[i] = bitmap[i] | (0x80 >> (serv % 8))
bitmap = dns.rdata._truncate_bitmap(bitmap)
return cls(rdclass, rdtype, address, protocol, bitmap)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
file.write(dns.ipv4.inet_aton(self.address))
protocol = struct.pack('!B', self.protocol)
file.write(protocol)
file.write(self.bitmap)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
address = dns.ipv4.inet_ntoa(wire[current: current + 4])
protocol, = struct.unpack('!B', wire[current + 4: current + 5])
current += 5
rdlen -= 5
bitmap = wire[current: current + rdlen].unwrap()
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
address = dns.ipv4.inet_ntoa(parser.get_bytes(4))
protocol = parser.get_uint8()
bitmap = parser.get_remaining()
return cls(rdclass, rdtype, address, protocol, bitmap)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -20,6 +22,7 @@ __all__ = [
'AAAA',
'APL',
'DHCID',
'IPSECKEY',
'KX',
'NAPTR',
'NSAP',

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -18,7 +20,9 @@
__all__ = [
'ANY',
'IN',
'CH',
'euibase',
'mxbase',
'nsbase',
'util'
]

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2004-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -14,6 +16,7 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import base64
import enum
import struct
import dns.exception
@ -21,116 +24,51 @@ import dns.dnssec
import dns.rdata
# wildcard import
__all__ = ["SEP", "REVOKE", "ZONE",
"flags_to_text_set", "flags_from_text_set"]
__all__ = ["SEP", "REVOKE", "ZONE"] # noqa: F822
# flag constants
SEP = 0x0001
REVOKE = 0x0080
ZONE = 0x0100
class Flag(enum.IntFlag):
SEP = 0x0001
REVOKE = 0x0080
ZONE = 0x0100
_flag_by_text = {
'SEP': SEP,
'REVOKE': REVOKE,
'ZONE': ZONE
}
# We construct the inverse mapping programmatically to ensure that we
# cannot make any mistakes (e.g. omissions, cut-and-paste errors) that
# would cause the mapping not to be true inverse.
_flag_by_value = dict((y, x) for x, y in _flag_by_text.items())
def flags_to_text_set(flags):
"""Convert a DNSKEY flags value to set texts
@rtype: set([string])"""
flags_set = set()
mask = 0x1
while mask <= 0x8000:
if flags & mask:
text = _flag_by_value.get(mask)
if not text:
text = hex(mask)
flags_set.add(text)
mask <<= 1
return flags_set
def flags_from_text_set(texts_set):
"""Convert set of DNSKEY flag mnemonic texts to DNSKEY flag value
@rtype: int"""
flags = 0
for text in texts_set:
try:
flags += _flag_by_text[text]
except KeyError:
raise NotImplementedError(
"DNSKEY flag '%s' is not supported" % text)
return flags
globals().update(Flag.__members__)
class DNSKEYBase(dns.rdata.Rdata):
"""Base class for rdata that is like a DNSKEY record
@ivar flags: the key flags
@type flags: int
@ivar protocol: the protocol for which this key may be used
@type protocol: int
@ivar algorithm: the algorithm used for the key
@type algorithm: int
@ivar key: the public key
@type key: string"""
"""Base class for rdata that is like a DNSKEY record"""
__slots__ = ['flags', 'protocol', 'algorithm', 'key']
def __init__(self, rdclass, rdtype, flags, protocol, algorithm, key):
super(DNSKEYBase, self).__init__(rdclass, rdtype)
self.flags = flags
self.protocol = protocol
self.algorithm = algorithm
self.key = key
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'flags', flags)
object.__setattr__(self, 'protocol', protocol)
object.__setattr__(self, 'algorithm', algorithm)
object.__setattr__(self, 'key', key)
def to_text(self, origin=None, relativize=True, **kw):
return '%d %d %d %s' % (self.flags, self.protocol, self.algorithm,
dns.rdata._base64ify(self.key))
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
flags = tok.get_uint16()
protocol = tok.get_uint8()
algorithm = dns.dnssec.algorithm_from_text(tok.get_string())
chunks = []
while 1:
t = tok.get().unescape()
if t.is_eol_or_eof():
break
if not t.is_identifier():
raise dns.exception.SyntaxError
chunks.append(t.value.encode())
b64 = b''.join(chunks)
b64 = tok.concatenate_remaining_identifiers().encode()
key = base64.b64decode(b64)
return cls(rdclass, rdtype, flags, protocol, algorithm, key)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
header = struct.pack("!HBB", self.flags, self.protocol, self.algorithm)
file.write(header)
file.write(self.key)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
if rdlen < 4:
raise dns.exception.FormError
header = struct.unpack('!HBB', wire[current: current + 4])
current += 4
rdlen -= 4
key = wire[current: current + rdlen].unwrap()
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
header = parser.get_struct('!HBB')
key = parser.get_remaining()
return cls(rdclass, rdtype, header[0], header[1], header[2],
key)
def flags_to_text_set(self):
"""Convert a DNSKEY flags value to set texts
@rtype: set([string])"""
return flags_to_text_set(self.flags)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2010, 2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -16,33 +18,24 @@
import struct
import binascii
import dns.dnssec
import dns.rdata
import dns.rdatatype
class DSBase(dns.rdata.Rdata):
"""Base class for rdata that is like a DS record
@ivar key_tag: the key tag
@type key_tag: int
@ivar algorithm: the algorithm
@type algorithm: int
@ivar digest_type: the digest type
@type digest_type: int
@ivar digest: the digest
@type digest: int
@see: draft-ietf-dnsext-delegation-signer-14.txt"""
"""Base class for rdata that is like a DS record"""
__slots__ = ['key_tag', 'algorithm', 'digest_type', 'digest']
def __init__(self, rdclass, rdtype, key_tag, algorithm, digest_type,
digest):
super(DSBase, self).__init__(rdclass, rdtype)
self.key_tag = key_tag
self.algorithm = algorithm
self.digest_type = digest_type
self.digest = digest
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'key_tag', key_tag)
object.__setattr__(self, 'algorithm', algorithm)
object.__setattr__(self, 'digest_type', digest_type)
object.__setattr__(self, 'digest', digest)
def to_text(self, origin=None, relativize=True, **kw):
return '%d %d %d %s' % (self.key_tag, self.algorithm,
@ -51,34 +44,24 @@ class DSBase(dns.rdata.Rdata):
chunksize=128))
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
key_tag = tok.get_uint16()
algorithm = tok.get_uint8()
algorithm = dns.dnssec.algorithm_from_text(tok.get_string())
digest_type = tok.get_uint8()
chunks = []
while 1:
t = tok.get().unescape()
if t.is_eol_or_eof():
break
if not t.is_identifier():
raise dns.exception.SyntaxError
chunks.append(t.value.encode())
digest = b''.join(chunks)
digest = tok.concatenate_remaining_identifiers().encode()
digest = binascii.unhexlify(digest)
return cls(rdclass, rdtype, key_tag, algorithm, digest_type,
digest)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
header = struct.pack("!HBB", self.key_tag, self.algorithm,
self.digest_type)
file.write(header)
file.write(self.digest)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
header = struct.unpack("!HBB", wire[current: current + 4])
current += 4
rdlen -= 4
digest = wire[current: current + rdlen].unwrap()
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
header = parser.get_struct("!HBB")
digest = parser.get_remaining()
return cls(rdclass, rdtype, header[0], header[1], header[2], digest)

View file

@ -21,11 +21,9 @@ import dns.rdata
class EUIBase(dns.rdata.Rdata):
"""EUIxx record
"""EUIxx record"""
@ivar fingerprint: xx-bit Extended Unique Identifier (EUI-xx)
@type fingerprint: string
@see: rfc7043.txt"""
# see: rfc7043.txt
__slots__ = ['eui']
# define these in subclasses
@ -33,24 +31,24 @@ class EUIBase(dns.rdata.Rdata):
# text_len = byte_len * 3 - 1 # 01-23-45-67-89-ab
def __init__(self, rdclass, rdtype, eui):
super(EUIBase, self).__init__(rdclass, rdtype)
super().__init__(rdclass, rdtype)
if len(eui) != self.byte_len:
raise dns.exception.FormError('EUI%s rdata has to have %s bytes'
% (self.byte_len * 8, self.byte_len))
self.eui = eui
object.__setattr__(self, 'eui', eui)
def to_text(self, origin=None, relativize=True, **kw):
return dns.rdata._hexify(self.eui, chunksize=2).replace(' ', '-')
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
text = tok.get_string()
tok.get_eol()
if len(text) != cls.text_len:
raise dns.exception.SyntaxError(
'Input text must have %s characters' % cls.text_len)
expected_dash_idxs = range(2, cls.byte_len * 3 - 1, 3)
for i in expected_dash_idxs:
for i in range(2, cls.byte_len * 3 - 1, 3):
if text[i] != '-':
raise dns.exception.SyntaxError('Dash expected at position %s'
% i)
@ -61,11 +59,10 @@ class EUIBase(dns.rdata.Rdata):
raise dns.exception.SyntaxError('Hex decoding error: %s' % str(ex))
return cls(rdclass, rdtype, data)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
file.write(self.eui)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
eui = wire[current:current + rdlen].unwrap()
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
eui = parser.get_bytes(cls.byte_len)
return cls(rdclass, rdtype, eui)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -15,7 +17,6 @@
"""MX-like base classes."""
from io import BytesIO
import struct
import dns.exception
@ -25,57 +26,38 @@ import dns.name
class MXBase(dns.rdata.Rdata):
"""Base class for rdata that is like an MX record.
@ivar preference: the preference value
@type preference: int
@ivar exchange: the exchange name
@type exchange: dns.name.Name object"""
"""Base class for rdata that is like an MX record."""
__slots__ = ['preference', 'exchange']
def __init__(self, rdclass, rdtype, preference, exchange):
super(MXBase, self).__init__(rdclass, rdtype)
self.preference = preference
self.exchange = exchange
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'preference', preference)
object.__setattr__(self, 'exchange', exchange)
def to_text(self, origin=None, relativize=True, **kw):
exchange = self.exchange.choose_relativity(origin, relativize)
return '%d %s' % (self.preference, exchange)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
preference = tok.get_uint16()
exchange = tok.get_name()
exchange = exchange.choose_relativity(origin, relativize)
exchange = tok.get_name(origin, relativize, relativize_to)
tok.get_eol()
return cls(rdclass, rdtype, preference, exchange)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
pref = struct.pack("!H", self.preference)
file.write(pref)
self.exchange.to_wire(file, compress, origin)
def to_digestable(self, origin=None):
return struct.pack("!H", self.preference) + \
self.exchange.to_digestable(origin)
self.exchange.to_wire(file, compress, origin, canonicalize)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
(preference, ) = struct.unpack('!H', wire[current: current + 2])
current += 2
rdlen -= 2
(exchange, cused) = dns.name.from_wire(wire[: current + rdlen],
current)
if cused != rdlen:
raise dns.exception.FormError
if origin is not None:
exchange = exchange.relativize(origin)
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
preference = parser.get_uint16()
exchange = parser.get_name(origin)
return cls(rdclass, rdtype, preference, exchange)
def choose_relativity(self, origin=None, relativize=True):
self.exchange = self.exchange.choose_relativity(origin, relativize)
class UncompressedMX(MXBase):
@ -83,13 +65,8 @@ class UncompressedMX(MXBase):
is not compressed when converted to DNS wire format, and whose
digestable form is not downcased."""
def to_wire(self, file, compress=None, origin=None):
super(UncompressedMX, self).to_wire(file, None, origin)
def to_digestable(self, origin=None):
f = BytesIO()
self.to_wire(f, None, origin)
return f.getvalue()
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
super()._to_wire(file, None, origin, False)
class UncompressedDowncasingMX(MXBase):
@ -97,5 +74,5 @@ class UncompressedDowncasingMX(MXBase):
"""Base class for rdata that is like an MX record, but whose name
is not compressed when convert to DNS wire format."""
def to_wire(self, file, compress=None, origin=None):
super(UncompressedDowncasingMX, self).to_wire(file, None, origin)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
super()._to_wire(file, None, origin, canonicalize)

View file

@ -1,3 +1,5 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
@ -15,8 +17,6 @@
"""NS-like base classes."""
from io import BytesIO
import dns.exception
import dns.rdata
import dns.name
@ -24,47 +24,33 @@ import dns.name
class NSBase(dns.rdata.Rdata):
"""Base class for rdata that is like an NS record.
@ivar target: the target name of the rdata
@type target: dns.name.Name object"""
"""Base class for rdata that is like an NS record."""
__slots__ = ['target']
def __init__(self, rdclass, rdtype, target):
super(NSBase, self).__init__(rdclass, rdtype)
self.target = target
super().__init__(rdclass, rdtype)
object.__setattr__(self, 'target', target)
def to_text(self, origin=None, relativize=True, **kw):
target = self.target.choose_relativity(origin, relativize)
return str(target)
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
target = tok.get_name()
target = target.choose_relativity(origin, relativize)
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
target = tok.get_name(origin, relativize, relativize_to)
tok.get_eol()
return cls(rdclass, rdtype, target)
def to_wire(self, file, compress=None, origin=None):
self.target.to_wire(file, compress, origin)
def to_digestable(self, origin=None):
return self.target.to_digestable(origin)
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
self.target.to_wire(file, compress, origin, canonicalize)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
(target, cused) = dns.name.from_wire(wire[: current + rdlen],
current)
if cused != rdlen:
raise dns.exception.FormError
if origin is not None:
target = target.relativize(origin)
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
target = parser.get_name(origin)
return cls(rdclass, rdtype, target)
def choose_relativity(self, origin=None, relativize=True):
self.target = self.target.choose_relativity(origin, relativize)
class UncompressedNS(NSBase):
@ -72,10 +58,5 @@ class UncompressedNS(NSBase):
is not compressed when convert to DNS wire format, and whose
digestable form is not downcased."""
def to_wire(self, file, compress=None, origin=None):
super(UncompressedNS, self).to_wire(file, None, origin)
def to_digestable(self, origin=None):
f = BytesIO()
self.to_wire(f, None, origin)
return f.getvalue()
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
self.target.to_wire(file, None, origin, False)

View file

@ -1,4 +1,6 @@
# Copyright (C) 2006, 2007, 2009-2011 Nominum, Inc.
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2006-2017 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
@ -20,54 +22,61 @@ import struct
import dns.exception
import dns.rdata
import dns.tokenizer
from dns._compat import binary_type
class TXTBase(dns.rdata.Rdata):
"""Base class for rdata that is like a TXT record
@ivar strings: the text strings
@type strings: list of string
@see: RFC 1035"""
"""Base class for rdata that is like a TXT record (see RFC 1035)."""
__slots__ = ['strings']
def __init__(self, rdclass, rdtype, strings):
super(TXTBase, self).__init__(rdclass, rdtype)
if isinstance(strings, str):
strings = [strings]
self.strings = strings[:]
"""Initialize a TXT-like rdata.
*rdclass*, an ``int`` is the rdataclass of the Rdata.
*rdtype*, an ``int`` is the rdatatype of the Rdata.
*strings*, a tuple of ``bytes``
"""
super().__init__(rdclass, rdtype)
if isinstance(strings, (bytes, str)):
strings = (strings,)
encoded_strings = []
for string in strings:
if isinstance(string, str):
string = string.encode()
else:
string = dns.rdata._constify(string)
encoded_strings.append(string)
object.__setattr__(self, 'strings', tuple(encoded_strings))
def to_text(self, origin=None, relativize=True, **kw):
txt = ''
prefix = ''
for s in self.strings:
txt += '%s"%s"' % (prefix, dns.rdata._escapify(s))
txt += '{}"{}"'.format(prefix, dns.rdata._escapify(s))
prefix = ' '
return txt
@classmethod
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True,
relativize_to=None):
strings = []
while 1:
token = tok.get().unescape()
token = tok.get().unescape_to_bytes()
if token.is_eol_or_eof():
break
if not (token.is_quoted_string() or token.is_identifier()):
raise dns.exception.SyntaxError("expected a string")
if len(token.value) > 255:
raise dns.exception.SyntaxError("string too long")
value = token.value
if isinstance(value, binary_type):
strings.append(value)
else:
strings.append(value.encode())
strings.append(token.value)
if len(strings) == 0:
raise dns.exception.UnexpectedEnd
return cls(rdclass, rdtype, strings)
def to_wire(self, file, compress=None, origin=None):
def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
for s in self.strings:
l = len(s)
assert l < 256
@ -75,17 +84,9 @@ class TXTBase(dns.rdata.Rdata):
file.write(s)
@classmethod
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
strings = []
while rdlen > 0:
l = wire[current]
current += 1
rdlen -= 1
if l > rdlen:
raise dns.exception.FormError
s = wire[current: current + l].unwrap()
current += l
rdlen -= l
while parser.remaining() > 0:
s = parser.get_counted_bytes()
strings.append(s)
return cls(rdclass, rdtype, strings)

166
lib/dns/rdtypes/util.py Normal file
View file

@ -0,0 +1,166 @@
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
# Copyright (C) 2006, 2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
# provided that the above copyright notice and this permission notice
# appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import struct
import dns.exception
import dns.name
import dns.ipv4
import dns.ipv6
class Gateway:
"""A helper class for the IPSECKEY gateway and AMTRELAY relay fields"""
name = ""
def __init__(self, type, gateway=None):
self.type = type
self.gateway = gateway
def _invalid_type(self):
return f"invalid {self.name} type: {self.type}"
def check(self):
if self.type == 0:
if self.gateway not in (".", None):
raise SyntaxError(f"invalid {self.name} for type 0")
self.gateway = None
elif self.type == 1:
# check that it's OK
dns.ipv4.inet_aton(self.gateway)
elif self.type == 2:
# check that it's OK
dns.ipv6.inet_aton(self.gateway)
elif self.type == 3:
if not isinstance(self.gateway, dns.name.Name):
raise SyntaxError(f"invalid {self.name}; not a name")
else:
raise SyntaxError(self._invalid_type())
def to_text(self, origin=None, relativize=True):
if self.type == 0:
return "."
elif self.type in (1, 2):
return self.gateway
elif self.type == 3:
return str(self.gateway.choose_relativity(origin, relativize))
else:
raise ValueError(self._invalid_type())
def from_text(self, tok, origin=None, relativize=True, relativize_to=None):
if self.type in (0, 1, 2):
return tok.get_string()
elif self.type == 3:
return tok.get_name(origin, relativize, relativize_to)
else:
raise dns.exception.SyntaxError(self._invalid_type())
def to_wire(self, file, compress=None, origin=None, canonicalize=False):
if self.type == 0:
pass
elif self.type == 1:
file.write(dns.ipv4.inet_aton(self.gateway))
elif self.type == 2:
file.write(dns.ipv6.inet_aton(self.gateway))
elif self.type == 3:
self.gateway.to_wire(file, None, origin, False)
else:
raise ValueError(self._invalid_type())
def from_wire_parser(self, parser, origin=None):
if self.type == 0:
return None
elif self.type == 1:
return dns.ipv4.inet_ntoa(parser.get_bytes(4))
elif self.type == 2:
return dns.ipv6.inet_ntoa(parser.get_bytes(16))
elif self.type == 3:
return parser.get_name(origin)
else:
raise dns.exception.FormError(self._invalid_type())
class Bitmap:
"""A helper class for the NSEC/NSEC3/CSYNC type bitmaps"""
type_name = ""
def __init__(self, windows=None):
self.windows = windows
def to_text(self):
text = ""
for (window, bitmap) in self.windows:
bits = []
for (i, byte) in enumerate(bitmap):
for j in range(0, 8):
if byte & (0x80 >> j):
rdtype = window * 256 + i * 8 + j
bits.append(dns.rdatatype.to_text(rdtype))
text += (' ' + ' '.join(bits))
return text
def from_text(self, tok):
rdtypes = []
while True:
token = tok.get().unescape()
if token.is_eol_or_eof():
break
rdtype = dns.rdatatype.from_text(token.value)
if rdtype == 0:
raise dns.exception.SyntaxError(f"{self.type_name} with bit 0")
rdtypes.append(rdtype)
rdtypes.sort()
window = 0
octets = 0
prior_rdtype = 0
bitmap = bytearray(b'\0' * 32)
windows = []
for rdtype in rdtypes:
if rdtype == prior_rdtype:
continue
prior_rdtype = rdtype
new_window = rdtype // 256
if new_window != window:
if octets != 0:
windows.append((window, bitmap[0:octets]))
bitmap = bytearray(b'\0' * 32)
window = new_window
offset = rdtype % 256
byte = offset // 8
bit = offset % 8
octets = byte + 1
bitmap[byte] = bitmap[byte] | (0x80 >> bit)
if octets != 0:
windows.append((window, bitmap[0:octets]))
return windows
def to_wire(self, file):
for (window, bitmap) in self.windows:
file.write(struct.pack('!BB', window, len(bitmap)))
file.write(bitmap)
def from_wire_parser(self, parser):
windows = []
last_window = -1
while parser.remaining() > 0:
window = parser.get_uint8()
if window <= last_window:
raise dns.exception.FormError(f"bad {self.type_name} bitmap")
bitmap = parser.get_counted_bytes()
if len(bitmap) == 0 or len(bitmap) > 32:
raise dns.exception.FormError(f"bad {self.type_name} octets")
windows.append((window, bitmap))
last_window = window
return windows