Bump dnspython from 2.6.1 to 2.7.0 (#2440)

* Bump dnspython from 2.6.1 to 2.7.0

Bumps [dnspython](https://github.com/rthalley/dnspython) from 2.6.1 to 2.7.0.
- [Release notes](https://github.com/rthalley/dnspython/releases)
- [Changelog](https://github.com/rthalley/dnspython/blob/main/doc/whatsnew.rst)
- [Commits](https://github.com/rthalley/dnspython/compare/v2.6.1...v2.7.0)

---
updated-dependencies:
- dependency-name: dnspython
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Update dnspython==2.7.0

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com>

[skip ci]
This commit is contained in:
dependabot[bot] 2024-11-19 10:00:50 -08:00 committed by GitHub
parent 0836fb902c
commit feca713b76
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
56 changed files with 1382 additions and 665 deletions

View file

@ -18,9 +18,10 @@
"""DNS Messages"""
import contextlib
import enum
import io
import time
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import Any, Dict, List, Optional, Tuple, Union, cast
import dns.edns
import dns.entropy
@ -161,6 +162,7 @@ class Message:
self.index: IndexType = {}
self.errors: List[MessageError] = []
self.time = 0.0
self.wire: Optional[bytes] = None
@property
def question(self) -> List[dns.rrset.RRset]:
@ -220,16 +222,16 @@ class Message:
s = io.StringIO()
s.write("id %d\n" % self.id)
s.write("opcode %s\n" % dns.opcode.to_text(self.opcode()))
s.write("rcode %s\n" % dns.rcode.to_text(self.rcode()))
s.write("flags %s\n" % dns.flags.to_text(self.flags))
s.write(f"opcode {dns.opcode.to_text(self.opcode())}\n")
s.write(f"rcode {dns.rcode.to_text(self.rcode())}\n")
s.write(f"flags {dns.flags.to_text(self.flags)}\n")
if self.edns >= 0:
s.write("edns %s\n" % self.edns)
s.write(f"edns {self.edns}\n")
if self.ednsflags != 0:
s.write("eflags %s\n" % dns.flags.edns_to_text(self.ednsflags))
s.write(f"eflags {dns.flags.edns_to_text(self.ednsflags)}\n")
s.write("payload %d\n" % self.payload)
for opt in self.options:
s.write("option %s\n" % opt.to_text())
s.write(f"option {opt.to_text()}\n")
for name, which in self._section_enum.__members__.items():
s.write(f";{name}\n")
for rrset in self.section_from_number(which):
@ -645,6 +647,7 @@ class Message:
if multi:
self.tsig_ctx = ctx
wire = r.get_wire()
self.wire = wire
if prepend_length:
wire = len(wire).to_bytes(2, "big") + wire
return wire
@ -912,6 +915,14 @@ class Message:
self.flags &= 0x87FF
self.flags |= dns.opcode.to_flags(opcode)
def get_options(self, otype: dns.edns.OptionType) -> List[dns.edns.Option]:
"""Return the list of options of the specified type."""
return [option for option in self.options if option.otype == otype]
def extended_errors(self) -> List[dns.edns.EDEOption]:
"""Return the list of Extended DNS Error (EDE) options in the message"""
return cast(List[dns.edns.EDEOption], self.get_options(dns.edns.OptionType.EDE))
def _get_one_rr_per_rrset(self, value):
# What the caller picked is fine.
return value
@ -1192,9 +1203,9 @@ class _WireReader:
if rdtype == dns.rdatatype.OPT:
self.message.opt = dns.rrset.from_rdata(name, ttl, rd)
elif rdtype == dns.rdatatype.TSIG:
if self.keyring is None:
if self.keyring is None or self.keyring is True:
raise UnknownTSIGKey("got signed message without keyring")
if isinstance(self.keyring, dict):
elif isinstance(self.keyring, dict):
key = self.keyring.get(absolute_name)
if isinstance(key, bytes):
key = dns.tsig.Key(absolute_name, key, rd.algorithm)
@ -1203,19 +1214,20 @@ class _WireReader:
else:
key = self.keyring
if key is None:
raise UnknownTSIGKey("key '%s' unknown" % name)
self.message.keyring = key
self.message.tsig_ctx = dns.tsig.validate(
self.parser.wire,
key,
absolute_name,
rd,
int(time.time()),
self.message.request_mac,
rr_start,
self.message.tsig_ctx,
self.multi,
)
raise UnknownTSIGKey(f"key '{name}' unknown")
if key:
self.message.keyring = key
self.message.tsig_ctx = dns.tsig.validate(
self.parser.wire,
key,
absolute_name,
rd,
int(time.time()),
self.message.request_mac,
rr_start,
self.message.tsig_ctx,
self.multi,
)
self.message.tsig = dns.rrset.from_rdata(absolute_name, 0, rd)
else:
rrset = self.message.find_rrset(
@ -1251,6 +1263,7 @@ class _WireReader:
factory = _message_factory_from_opcode(dns.opcode.from_flags(flags))
self.message = factory(id=id)
self.message.flags = dns.flags.Flag(flags)
self.message.wire = self.parser.wire
self.initialize_message(self.message)
self.one_rr_per_rrset = self.message._get_one_rr_per_rrset(
self.one_rr_per_rrset
@ -1290,8 +1303,10 @@ def from_wire(
) -> Message:
"""Convert a DNS wire format message into a message object.
*keyring*, a ``dns.tsig.Key`` or ``dict``, the key or keyring to use if the message
is signed.
*keyring*, a ``dns.tsig.Key``, ``dict``, ``bool``, or ``None``, the key or keyring
to use if the message is signed. If ``None`` or ``True``, then trying to decode
a message with a TSIG will fail as it cannot be validated. If ``False``, then
TSIG validation is disabled.
*request_mac*, a ``bytes`` or ``None``. If the message is a response to a
TSIG-signed request, *request_mac* should be set to the MAC of that request.
@ -1811,6 +1826,16 @@ def make_query(
return m
class CopyMode(enum.Enum):
"""
How should sections be copied when making an update response?
"""
NOTHING = 0
QUESTION = 1
EVERYTHING = 2
def make_response(
query: Message,
recursion_available: bool = False,
@ -1818,13 +1843,14 @@ def make_response(
fudge: int = 300,
tsig_error: int = 0,
pad: Optional[int] = None,
copy_mode: Optional[CopyMode] = None,
) -> Message:
"""Make a message which is a response for the specified query.
The message returned is really a response skeleton; it has all of the infrastructure
required of a response, but none of the content.
The response's question section is a shallow copy of the query's question section,
so the query's question RRsets should not be changed.
Response section(s) which are copied are shallow copies of the matching section(s)
in the query, so the query's RRsets should not be changed.
*query*, a ``dns.message.Message``, the query to respond to.
@ -1837,25 +1863,44 @@ def make_response(
*tsig_error*, an ``int``, the TSIG error.
*pad*, a non-negative ``int`` or ``None``. If 0, the default, do not pad; otherwise
if not ``None`` add padding bytes to make the message size a multiple of *pad*.
Note that if padding is non-zero, an EDNS PADDING option will always be added to the
if not ``None`` add padding bytes to make the message size a multiple of *pad*. Note
that if padding is non-zero, an EDNS PADDING option will always be added to the
message. If ``None``, add padding following RFC 8467, namely if the request is
padded, pad the response to 468 otherwise do not pad.
*copy_mode*, a ``dns.message.CopyMode`` or ``None``, determines how sections are
copied. The default, ``None`` copies sections according to the default for the
message's opcode, which is currently ``dns.message.CopyMode.QUESTION`` for all
opcodes. ``dns.message.CopyMode.QUESTION`` copies only the question section.
``dns.message.CopyMode.EVERYTHING`` copies all sections other than OPT or TSIG
records, which are created appropriately if needed. ``dns.message.CopyMode.NOTHING``
copies no sections; note that this mode is for server testing purposes and is
otherwise not recommended for use. In particular, ``dns.message.is_response()``
will be ``False`` if you create a response this way and the rcode is not
``FORMERR``, ``SERVFAIL``, ``NOTIMP``, or ``REFUSED``.
Returns a ``dns.message.Message`` object whose specific class is appropriate for the
query. For example, if query is a ``dns.update.UpdateMessage``, response will be
too.
query. For example, if query is a ``dns.update.UpdateMessage``, the response will
be one too.
"""
if query.flags & dns.flags.QR:
raise dns.exception.FormError("specified query message is not a query")
factory = _message_factory_from_opcode(query.opcode())
opcode = query.opcode()
factory = _message_factory_from_opcode(opcode)
response = factory(id=query.id)
response.flags = dns.flags.QR | (query.flags & dns.flags.RD)
if recursion_available:
response.flags |= dns.flags.RA
response.set_opcode(query.opcode())
response.question = list(query.question)
response.set_opcode(opcode)
if copy_mode is None:
copy_mode = CopyMode.QUESTION
if copy_mode != CopyMode.NOTHING:
response.question = list(query.question)
if copy_mode == CopyMode.EVERYTHING:
response.answer = list(query.answer)
response.authority = list(query.authority)
response.additional = list(query.additional)
if query.edns >= 0:
if pad is None:
# Set response padding per RFC 8467