Update ipwhois-1.2.0

This commit is contained in:
JonnyWong16 2021-10-14 21:36:53 -07:00
commit 2c4cc34b2b
No known key found for this signature in database
GPG key ID: B1F1F9807184697A
13 changed files with 289 additions and 368 deletions

View file

@ -1,4 +1,4 @@
# Copyright (c) 2013-2019 Philip Hane
# Copyright (c) 2013-2020 Philip Hane
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@ -28,6 +28,7 @@ from .utils import ipv4_lstrip_zeros, calculate_cidr, unique_everseen
from .net import ip_address
import logging
import json
from collections import namedtuple
log = logging.getLogger(__name__)
@ -553,7 +554,7 @@ class _RDAPNetwork(_RDAPCommon):
self.vars[v] = self.json[v].strip()
except (KeyError, ValueError):
except (KeyError, ValueError, AttributeError):
pass
@ -688,9 +689,95 @@ class RDAP:
raise NetError('The provided net parameter is not an instance of '
'ipwhois.net.Net')
def _get_entity(self, entity=None, roles=None, inc_raw=False, retry_count=3,
asn_data=None, bootstrap=False, rate_limit_timeout=120):
"""
The function for retrieving and parsing information for an entity via
RDAP (HTTP).
Args:
entity (:obj:`str`): The entity name to lookup.
roles (:obj:`dict`): The mapping of entity handles to roles.
inc_raw (:obj:`bool`, optional): Whether to include the raw
results in the returned dictionary. Defaults to False.
retry_count (:obj:`int`): The number of times to retry in case
socket errors, timeouts, connection resets, etc. are
encountered. Defaults to 3.
asn_data (:obj:`dict`): Result from
:obj:`ipwhois.asn.IPASN.lookup`. Optional if the bootstrap
parameter is True.
bootstrap (:obj:`bool`): If True, performs lookups via ARIN
bootstrap rather than lookups based on ASN data. Defaults to
False.
rate_limit_timeout (:obj:`int`): The number of seconds to wait
before retrying when a rate limit notice is returned via
rdap+json. Defaults to 120.
Returns:
namedtuple:
:result (dict): Consists of the fields listed in the
ipwhois.rdap._RDAPEntity dict. The raw result is included for
each object if the inc_raw parameter is True.
:roles (dict): The mapping of entity handles to roles.
"""
result = {}
if bootstrap:
entity_url = '{0}/entity/{1}'.format(
BOOTSTRAP_URL, entity)
else:
tmp_reg = asn_data['asn_registry']
entity_url = RIR_RDAP[tmp_reg]['entity_url']
entity_url = str(entity_url).format(entity)
try:
# RDAP entity query
response = self._net.get_http_json(
url=entity_url, retry_count=retry_count,
rate_limit_timeout=rate_limit_timeout
)
# Parse the entity
result_ent = _RDAPEntity(response)
result_ent.parse()
result = result_ent.vars
result['roles'] = None
try:
result['roles'] = roles[entity]
except KeyError: # pragma: no cover
pass
try:
for tmp in response['entities']:
if tmp['handle'] not in roles:
roles[tmp['handle']] = tmp['roles']
except (IndexError, KeyError):
pass
if inc_raw:
result['raw'] = response
except (HTTPLookupError, InvalidEntityObject):
pass
return_tuple = namedtuple('return_tuple', ['result', 'roles'])
return return_tuple(result, roles)
def lookup(self, inc_raw=False, retry_count=3, asn_data=None, depth=0,
excluded_entities=None, response=None, bootstrap=False,
rate_limit_timeout=120):
rate_limit_timeout=120, root_ent_check=True):
"""
The function for retrieving and parsing information for an IP
address via RDAP (HTTP).
@ -716,6 +803,9 @@ class RDAP:
rate_limit_timeout (:obj:`int`): The number of seconds to wait
before retrying when a rate limit notice is returned via
rdap+json. Defaults to 120.
root_ent_check (:obj:`bool`): If True, will perform
additional RDAP HTTP queries for missing entity data at the
root level. Defaults to True.
Returns:
dict: The IP RDAP lookup results
@ -792,10 +882,23 @@ class RDAP:
if ent['handle'] not in [results['entities'],
excluded_entities]:
result_ent = _RDAPEntity(ent)
result_ent.parse()
if 'vcardArray' not in ent and root_ent_check:
entity_object, roles = self._get_entity(
entity=ent['handle'],
roles=roles,
inc_raw=inc_raw,
retry_count=retry_count,
asn_data=asn_data,
bootstrap=bootstrap,
rate_limit_timeout=rate_limit_timeout
)
results['objects'][ent['handle']] = entity_object
results['objects'][ent['handle']] = result_ent.vars
else:
result_ent = _RDAPEntity(ent)
result_ent.parse()
results['objects'][ent['handle']] = result_ent.vars
results['entities'].append(ent['handle'])
@ -835,57 +938,18 @@ class RDAP:
list(new_objects.keys()) +
excluded_entities):
if bootstrap:
entity_url = '{0}/entity/{1}'.format(
BOOTSTRAP_URL, ent)
else:
tmp_reg = asn_data['asn_registry']
entity_url = RIR_RDAP[tmp_reg]['entity_url']
entity_url = str(entity_url).format(ent)
entity_object, roles = self._get_entity(
entity=ent,
roles=roles,
inc_raw=inc_raw,
retry_count=retry_count,
asn_data=asn_data,
bootstrap=bootstrap,
rate_limit_timeout=rate_limit_timeout
)
new_objects[ent] = entity_object
try:
# RDAP entity query
response = self._net.get_http_json(
url=entity_url, retry_count=retry_count,
rate_limit_timeout=rate_limit_timeout
)
# Parse the entity
result_ent = _RDAPEntity(response)
result_ent.parse()
new_objects[ent] = result_ent.vars
new_objects[ent]['roles'] = None
try:
new_objects[ent]['roles'] = roles[ent]
except KeyError: # pragma: no cover
pass
try:
for tmp in response['entities']:
if tmp['handle'] not in roles:
roles[tmp['handle']] = tmp['roles']
except (IndexError, KeyError):
pass
if inc_raw:
new_objects[ent]['raw'] = response
except (HTTPLookupError, InvalidEntityObject):
pass
except TypeError:
except (KeyError, TypeError):
pass