/usr/lib/python3.9/site-packages/dns
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license # # Support for Discovery of Designated Resolvers import socket import time from urllib.parse import urlparse import dns.asyncbackend import dns.inet import dns.name import dns.nameserver import dns.query import dns.rdtypes.svcbbase # The special name of the local resolver when using DDR _local_resolver_name = dns.name.from_text("_dns.resolver.arpa") # # Processing is split up into I/O independent and I/O dependent parts to # make supporting sync and async versions easy. # class _SVCBInfo: def __init__(self, bootstrap_address, port, hostname, nameservers): self.bootstrap_address = bootstrap_address self.port = port self.hostname = hostname self.nameservers = nameservers def ddr_check_certificate(self, cert): """Verify that the _SVCBInfo's address is in the cert's subjectAltName (SAN)""" for name, value in cert["subjectAltName"]: if name == "IP Address" and value == self.bootstrap_address: return True return False def make_tls_context(self): ssl = dns.query.ssl ctx = ssl.create_default_context() ctx.minimum_version = ssl.TLSVersion.TLSv1_2 return ctx def ddr_tls_check_sync(self, lifetime): ctx = self.make_tls_context() expiration = time.time() + lifetime with socket.create_connection( (self.bootstrap_address, self.port), lifetime ) as s: with ctx.wrap_socket(s, server_hostname=self.hostname) as ts: ts.settimeout(dns.query._remaining(expiration)) ts.do_handshake() cert = ts.getpeercert() return self.ddr_check_certificate(cert) async def ddr_tls_check_async(self, lifetime, backend=None): if backend is None: backend = dns.asyncbackend.get_default_backend() ctx = self.make_tls_context() expiration = time.time() + lifetime async with await backend.make_socket( dns.inet.af_for_address(self.bootstrap_address), socket.SOCK_STREAM, 0, None, (self.bootstrap_address, self.port), lifetime, ctx, self.hostname, ) as ts: cert = await ts.getpeercert(dns.query._remaining(expiration)) return self.ddr_check_certificate(cert) def _extract_nameservers_from_svcb(answer): bootstrap_address = answer.nameserver if not dns.inet.is_address(bootstrap_address): return [] infos = [] for rr in answer.rrset.processing_order(): nameservers = [] param = rr.params.get(dns.rdtypes.svcbbase.ParamKey.ALPN) if param is None: continue alpns = set(param.ids) host = rr.target.to_text(omit_final_dot=True) port = None param = rr.params.get(dns.rdtypes.svcbbase.ParamKey.PORT) if param is not None: port = param.port # For now we ignore address hints and address resolution and always use the # bootstrap address if b"h2" in alpns: param = rr.params.get(dns.rdtypes.svcbbase.ParamKey.DOHPATH) if param is None or not param.value.endswith(b"{?dns}"): continue path = param.value[:-6].decode() if not path.startswith("/"): path = "/" + path if port is None: port = 443 url = f"https://{host}:{port}{path}" # check the URL try: urlparse(url) nameservers.append(dns.nameserver.DoHNameserver(url, bootstrap_address)) except Exception: # continue processing other ALPN types pass if b"dot" in alpns: if port is None: port = 853 nameservers.append( dns.nameserver.DoTNameserver(bootstrap_address, port, host) ) if b"doq" in alpns: if port is None: port = 853 nameservers.append( dns.nameserver.DoQNameserver(bootstrap_address, port, True, host) ) if len(nameservers) > 0: infos.append(_SVCBInfo(bootstrap_address, port, host, nameservers)) return infos def _get_nameservers_sync(answer, lifetime): """Return a list of TLS-validated resolver nameservers extracted from an SVCB answer.""" nameservers = [] infos = _extract_nameservers_from_svcb(answer) for info in infos: try: if info.ddr_tls_check_sync(lifetime): nameservers.extend(info.nameservers) except Exception: pass return nameservers async def _get_nameservers_async(answer, lifetime): """Return a list of TLS-validated resolver nameservers extracted from an SVCB answer.""" nameservers = [] infos = _extract_nameservers_from_svcb(answer) for info in infos: try: if await info.ddr_tls_check_async(lifetime): nameservers.extend(info.nameservers) except Exception: pass return nameservers
.
Edit
..
Edit
__init__.py
Edit
__pycache__
Edit
_asyncbackend.py
Edit
_asyncio_backend.py
Edit
_ddr.py
Edit
_features.py
Edit
_immutable_ctx.py
Edit
asyncbackend.py
Edit
asyncquery.py
Edit
asyncresolver.py
Edit
dnssec.py
Edit
dnssecalgs
Edit
dnssectypes.py
Edit
e164.py
Edit
edns.py
Edit
entropy.py
Edit
enum.py
Edit
exception.py
Edit
flags.py
Edit
grange.py
Edit
immutable.py
Edit
inet.py
Edit
ipv4.py
Edit
ipv6.py
Edit
message.py
Edit
name.py
Edit
namedict.py
Edit
nameserver.py
Edit
node.py
Edit
opcode.py
Edit
query.py
Edit
quic
Edit
rcode.py
Edit
rdata.py
Edit
rdataclass.py
Edit
rdataset.py
Edit
rdatatype.py
Edit
rdtypes
Edit
renderer.py
Edit
resolver.py
Edit
reversename.py
Edit
rrset.py
Edit
serial.py
Edit
set.py
Edit
tokenizer.py
Edit
transaction.py
Edit
tsig.py
Edit
tsigkeyring.py
Edit
ttl.py
Edit
update.py
Edit
version.py
Edit
versioned.py
Edit
win32util.py
Edit
wire.py
Edit
xfr.py
Edit
zone.py
Edit
zonefile.py
Edit
zonetypes.py
Edit