Update ipwhois to 1.1.0

This commit is contained in:
JonnyWong16 2019-11-23 18:55:41 -08:00
parent 4d6279a626
commit 84ce4758d1
13 changed files with 5041 additions and 649 deletions

View file

@ -1,4 +1,4 @@
# Copyright (c) 2013, 2014, 2015, 2016 Philip Hane
# Copyright (c) 2013-2019 Philip Hane
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@ -22,8 +22,8 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
__version__ = '0.13.0'
from .exceptions import *
from .net import Net
from .ipwhois import IPWhois
__version__ = '1.1.0'

956
lib/ipwhois/asn.py Normal file
View file

@ -0,0 +1,956 @@
# Copyright (c) 2013-2019 Philip Hane
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import re
import sys
import copy
import logging
from .exceptions import (NetError, ASNRegistryError, ASNParseError,
ASNLookupError, HTTPLookupError, WhoisLookupError,
WhoisRateLimitError, ASNOriginLookupError)
if sys.version_info >= (3, 3): # pragma: no cover
from ipaddress import ip_network
else: # pragma: no cover
from ipaddr import IPNetwork as ip_network
log = logging.getLogger(__name__)
BASE_NET = {
'cidr': None,
'description': None,
'maintainer': None,
'updated': None,
'source': None
}
ASN_ORIGIN_WHOIS = {
'radb': {
'server': 'whois.radb.net',
'fields': {
'description': r'(descr):[^\S\n]+(?P<val>.+?)\n',
'maintainer': r'(mnt-by):[^\S\n]+(?P<val>.+?)\n',
'updated': r'(changed):[^\S\n]+(?P<val>.+?)\n',
'source': r'(source):[^\S\n]+(?P<val>.+?)\n',
}
},
}
ASN_ORIGIN_HTTP = {
'radb': {
'url': 'http://www.radb.net/query/',
'form_data_asn_field': 'keywords',
'form_data': {
'advanced_query': '1',
'query': 'Query',
'-T option': 'inet-rtr',
'ip_option': '',
'-i': '1',
'-i option': 'origin'
},
'fields': {
'description': r'(descr):[^\S\n]+(?P<val>.+?)\<br\>',
'maintainer': r'(mnt-by):[^\S\n]+(?P<val>.+?)\<br\>',
'updated': r'(changed):[^\S\n]+(?P<val>.+?)\<br\>',
'source': r'(source):[^\S\n]+(?P<val>.+?)\<br\>',
}
},
}
class IPASN:
"""
The class for parsing ASN data for an IP address.
Args:
net (:obj:`ipwhois.net.Net`): A ipwhois.net.Net object.
Raises:
NetError: The parameter provided is not an instance of
ipwhois.net.Net
"""
def __init__(self, net):
from .net import (Net, ORG_MAP)
from .whois import RIR_WHOIS
# ipwhois.net.Net validation
if isinstance(net, Net):
self._net = net
else:
raise NetError('The provided net parameter is not an instance of '
'ipwhois.net.Net')
self.org_map = ORG_MAP
self.rir_whois = RIR_WHOIS
def parse_fields_dns(self, response):
"""
The function for parsing ASN fields from a dns response.
Args:
response (:obj:`str`): The response from the ASN dns server.
Returns:
dict: The ASN lookup results
::
{
'asn' (str) - The Autonomous System Number
'asn_date' (str) - The ASN Allocation date
'asn_registry' (str) - The assigned ASN registry
'asn_cidr' (str) - The assigned ASN CIDR
'asn_country_code' (str) - The assigned ASN country code
'asn_description' (None) - Cannot retrieve with this
method.
}
Raises:
ASNRegistryError: The ASN registry is not known.
ASNParseError: ASN parsing failed.
"""
try:
temp = response.split('|')
# Parse out the ASN information.
ret = {'asn_registry': temp[3].strip(' \n')}
if ret['asn_registry'] not in self.rir_whois.keys():
raise ASNRegistryError(
'ASN registry {0} is not known.'.format(
ret['asn_registry'])
)
ret['asn'] = temp[0].strip(' "\n')
ret['asn_cidr'] = temp[1].strip(' \n')
ret['asn_country_code'] = temp[2].strip(' \n').upper()
ret['asn_date'] = temp[4].strip(' "\n')
ret['asn_description'] = None
except ASNRegistryError:
raise
except Exception as e:
raise ASNParseError('Parsing failed for "{0}" with exception: {1}.'
''.format(response, e)[:100])
return ret
def _parse_fields_dns(self, *args, **kwargs):
"""
Deprecated. This will be removed in a future release.
"""
from warnings import warn
warn('IPASN._parse_fields_dns() has been deprecated and will be '
'removed. You should now use IPASN.parse_fields_dns().')
return self.parse_fields_dns(*args, **kwargs)
def parse_fields_verbose_dns(self, response):
"""
The function for parsing ASN fields from a verbose dns response.
Args:
response (:obj:`str`): The response from the ASN dns server.
Returns:
dict: The ASN lookup results
::
{
'asn' (str) - The Autonomous System Number
'asn_date' (str) - The ASN Allocation date
'asn_registry' (str) - The assigned ASN registry
'asn_cidr' (None) - Cannot retrieve with this method.
'asn_country_code' (str) - The assigned ASN country code
'asn_description' (str) - The ASN description
}
Raises:
ASNRegistryError: The ASN registry is not known.
ASNParseError: ASN parsing failed.
"""
try:
temp = response.split('|')
# Parse out the ASN information.
ret = {'asn_registry': temp[2].strip(' \n')}
if ret['asn_registry'] not in self.rir_whois.keys():
raise ASNRegistryError(
'ASN registry {0} is not known.'.format(
ret['asn_registry'])
)
ret['asn'] = temp[0].strip(' "\n')
ret['asn_cidr'] = None
ret['asn_country_code'] = temp[1].strip(' \n').upper()
ret['asn_date'] = temp[3].strip(' \n')
ret['asn_description'] = temp[4].strip(' "\n')
except ASNRegistryError:
raise
except Exception as e:
raise ASNParseError('Parsing failed for "{0}" with exception: {1}.'
''.format(response, e)[:100])
return ret
def parse_fields_whois(self, response):
"""
The function for parsing ASN fields from a whois response.
Args:
response (:obj:`str`): The response from the ASN whois server.
Returns:
dict: The ASN lookup results
::
{
'asn' (str) - The Autonomous System Number
'asn_date' (str) - The ASN Allocation date
'asn_registry' (str) - The assigned ASN registry
'asn_cidr' (str) - The assigned ASN CIDR
'asn_country_code' (str) - The assigned ASN country code
'asn_description' (str) - The ASN description
}
Raises:
ASNRegistryError: The ASN registry is not known.
ASNParseError: ASN parsing failed.
"""
try:
temp = response.split('|')
# Parse out the ASN information.
ret = {'asn_registry': temp[4].strip(' \n')}
if ret['asn_registry'] not in self.rir_whois.keys():
raise ASNRegistryError(
'ASN registry {0} is not known.'.format(
ret['asn_registry'])
)
ret['asn'] = temp[0].strip(' \n')
ret['asn_cidr'] = temp[2].strip(' \n')
ret['asn_country_code'] = temp[3].strip(' \n').upper()
ret['asn_date'] = temp[5].strip(' \n')
ret['asn_description'] = temp[6].strip(' \n')
except ASNRegistryError:
raise
except Exception as e:
raise ASNParseError('Parsing failed for "{0}" with exception: {1}.'
''.format(response, e)[:100])
return ret
def _parse_fields_whois(self, *args, **kwargs):
"""
Deprecated. This will be removed in a future release.
"""
from warnings import warn
warn('IPASN._parse_fields_whois() has been deprecated and will be '
'removed. You should now use IPASN.parse_fields_whois().')
return self.parse_fields_whois(*args, **kwargs)
def parse_fields_http(self, response, extra_org_map=None):
"""
The function for parsing ASN fields from a http response.
Args:
response (:obj:`str`): The response from the ASN http server.
extra_org_map (:obj:`dict`): Dictionary mapping org handles to
RIRs. This is for limited cases where ARIN REST (ASN fallback
HTTP lookup) does not show an RIR as the org handle e.g., DNIC
(which is now the built in ORG_MAP) e.g., {'DNIC': 'arin'}.
Valid RIR values are (note the case-sensitive - this is meant
to match the REST result): 'ARIN', 'RIPE', 'apnic', 'lacnic',
'afrinic'. Defaults to None.
Returns:
dict: The ASN lookup results
::
{
'asn' (None) - Cannot retrieve with this method.
'asn_date' (None) - Cannot retrieve with this method.
'asn_registry' (str) - The assigned ASN registry
'asn_cidr' (None) - Cannot retrieve with this method.
'asn_country_code' (None) - Cannot retrieve with this
method.
'asn_description' (None) - Cannot retrieve with this
method.
}
Raises:
ASNRegistryError: The ASN registry is not known.
ASNParseError: ASN parsing failed.
"""
# Set the org_map. Map the orgRef handle to an RIR.
org_map = self.org_map.copy()
try:
org_map.update(extra_org_map)
except (TypeError, ValueError, IndexError, KeyError):
pass
try:
asn_data = {
'asn_registry': None,
'asn': None,
'asn_cidr': None,
'asn_country_code': None,
'asn_date': None,
'asn_description': None
}
try:
net_list = response['nets']['net']
if not isinstance(net_list, list):
net_list = [net_list]
except (KeyError, TypeError):
log.debug('No networks found')
net_list = []
for n in reversed(net_list):
try:
asn_data['asn_registry'] = (
org_map[n['orgRef']['@handle'].upper()]
)
except KeyError as e:
log.debug('Could not parse ASN registry via HTTP: '
'{0}'.format(str(e)))
continue
break
if not asn_data['asn_registry']:
log.debug('Could not parse ASN registry via HTTP')
raise ASNRegistryError('ASN registry lookup failed.')
except ASNRegistryError:
raise
except Exception as e: # pragma: no cover
raise ASNParseError('Parsing failed for "{0}" with exception: {1}.'
''.format(response, e)[:100])
return asn_data
def _parse_fields_http(self, *args, **kwargs):
"""
Deprecated. This will be removed in a future release.
"""
from warnings import warn
warn('IPASN._parse_fields_http() has been deprecated and will be '
'removed. You should now use IPASN.parse_fields_http().')
return self.parse_fields_http(*args, **kwargs)
def lookup(self, inc_raw=False, retry_count=3, asn_alts=None,
extra_org_map=None, asn_methods=None,
get_asn_description=True):
"""
The wrapper function for retrieving and parsing ASN information for an
IP address.
Args:
inc_raw (:obj:`bool`): Whether to include the raw results in the
returned dictionary. Defaults to False.
retry_count (:obj:`int`): The number of times to retry in case
socket errors, timeouts, connection resets, etc. are
encountered. Defaults to 3.
asn_alts (:obj:`list`): Additional lookup types to attempt if the
ASN dns lookup fails. Allow permutations must be enabled.
Defaults to all ['whois', 'http']. *WARNING* deprecated in
favor of new argument asn_methods. Defaults to None.
extra_org_map (:obj:`dict`): Mapping org handles to RIRs. This is
for limited cases where ARIN REST (ASN fallback HTTP lookup)
does not show an RIR as the org handle e.g., DNIC (which is
now the built in ORG_MAP) e.g., {'DNIC': 'arin'}. Valid RIR
values are (note the case-sensitive - this is meant to match
the REST result): 'ARIN', 'RIPE', 'apnic', 'lacnic', 'afrinic'
Defaults to None.
asn_methods (:obj:`list`): ASN lookup types to attempt, in order.
If None, defaults to all: ['dns', 'whois', 'http'].
get_asn_description (:obj:`bool`): Whether to run an additional
query when pulling ASN information via dns, in order to get
the ASN description. Defaults to True.
Returns:
dict: The ASN lookup results
::
{
'asn' (str) - The Autonomous System Number
'asn_date' (str) - The ASN Allocation date
'asn_registry' (str) - The assigned ASN registry
'asn_cidr' (str) - The assigned ASN CIDR
'asn_country_code' (str) - The assigned ASN country code
'asn_description' (str) - The ASN description
'raw' (str) - Raw ASN results if the inc_raw parameter is
True.
}
Raises:
ValueError: methods argument requires one of dns, whois, http.
ASNRegistryError: ASN registry does not match.
"""
if asn_methods is None:
if asn_alts is None:
lookups = ['dns', 'whois', 'http']
else:
from warnings import warn
warn('IPASN.lookup() asn_alts argument has been deprecated '
'and will be removed. You should now use the asn_methods '
'argument.')
lookups = ['dns'] + asn_alts
else:
if {'dns', 'whois', 'http'}.isdisjoint(asn_methods):
raise ValueError('methods argument requires at least one of '
'dns, whois, http.')
lookups = asn_methods
response = None
asn_data = None
dns_success = False
for index, lookup_method in enumerate(lookups):
if index > 0 and not asn_methods and not (
self._net.allow_permutations):
raise ASNRegistryError('ASN registry lookup failed. '
'Permutations not allowed.')
if lookup_method == 'dns':
try:
self._net.dns_resolver.lifetime = (
self._net.dns_resolver.timeout * (
retry_count and retry_count or 1
)
)
response = self._net.get_asn_dns()
asn_data_list = []
for asn_entry in response:
asn_data_list.append(self.parse_fields_dns(
str(asn_entry)))
# Iterate through the parsed ASN results to find the
# smallest CIDR
asn_data = asn_data_list.pop(0)
try:
prefix_len = ip_network(asn_data['asn_cidr']).prefixlen
for asn_parsed in asn_data_list:
prefix_len_comp = ip_network(
asn_parsed['asn_cidr']).prefixlen
if prefix_len_comp > prefix_len:
asn_data = asn_parsed
prefix_len = prefix_len_comp
except (KeyError, ValueError): # pragma: no cover
pass
dns_success = True
break
except (ASNLookupError, ASNRegistryError) as e:
log.debug('ASN DNS lookup failed: {0}'.format(e))
pass
elif lookup_method == 'whois':
try:
response = self._net.get_asn_whois(retry_count)
asn_data = self.parse_fields_whois(
response) # pragma: no cover
break
except (ASNLookupError, ASNRegistryError) as e:
log.debug('ASN WHOIS lookup failed: {0}'.format(e))
pass
elif lookup_method == 'http':
try:
response = self._net.get_asn_http(
retry_count=retry_count
)
asn_data = self.parse_fields_http(response,
extra_org_map)
break
except (ASNLookupError, ASNRegistryError) as e:
log.debug('ASN HTTP lookup failed: {0}'.format(e))
pass
if asn_data is None:
raise ASNRegistryError('ASN lookup failed with no more methods to '
'try.')
if get_asn_description and dns_success:
try:
response = self._net.get_asn_verbose_dns('AS{0}'.format(
asn_data['asn']))
asn_verbose_data = self.parse_fields_verbose_dns(response)
asn_data['asn_description'] = asn_verbose_data[
'asn_description']
except (ASNLookupError, ASNRegistryError) as e: # pragma: no cover
log.debug('ASN DNS verbose lookup failed: {0}'.format(e))
pass
if inc_raw:
asn_data['raw'] = response
return asn_data
class ASNOrigin:
"""
The class for parsing ASN origin whois data
Args:
net (:obj:`ipwhois.net.Net`): A ipwhois.net.Net object.
Raises:
NetError: The parameter provided is not an instance of
ipwhois.net.Net
"""
def __init__(self, net):
from .net import Net
# ipwhois.net.Net validation
if isinstance(net, Net):
self._net = net
else:
raise NetError('The provided net parameter is not an instance of '
'ipwhois.net.Net')
def parse_fields(self, response, fields_dict, net_start=None,
net_end=None, field_list=None):
"""
The function for parsing ASN whois fields from a data input.
Args:
response (:obj:`str`): The response from the whois/rwhois server.
fields_dict (:obj:`dict`): Mapping of fields->regex search values.
net_start (:obj:`int`): The starting point of the network (if
parsing multiple networks). Defaults to None.
net_end (:obj:`int`): The ending point of the network (if parsing
multiple networks). Defaults to None.
field_list (:obj:`list`): If provided, a list of fields to parse:
['description', 'maintainer', 'updated', 'source']
If None, defaults to all fields.
Returns:
dict: A dictionary of fields provided in fields_dict.
"""
ret = {}
if not field_list:
field_list = ['description', 'maintainer', 'updated', 'source']
generate = ((field, pattern) for (field, pattern) in
fields_dict.items() if field in field_list)
for field, pattern in generate:
pattern = re.compile(
str(pattern),
re.DOTALL
)
if net_start is not None:
match = pattern.finditer(response, net_end, net_start)
elif net_end is not None:
match = pattern.finditer(response, net_end)
else:
match = pattern.finditer(response)
values = []
sub_section_end = None
for m in match:
if sub_section_end:
if sub_section_end != (m.start() - 1):
break
try:
values.append(m.group('val').strip())
except IndexError: # pragma: no cover
pass
sub_section_end = m.end()
if len(values) > 0:
value = None
try:
value = values[0]
except ValueError as e: # pragma: no cover
log.debug('ASN origin Whois field parsing failed for {0}: '
'{1}'.format(field, e))
pass
ret[field] = value
return ret
def _parse_fields(self, *args, **kwargs):
"""
Deprecated. This will be removed in a future release.
"""
from warnings import warn
warn('ASNOrigin._parse_fields() has been deprecated and will be '
'removed. You should now use ASNOrigin.parse_fields().')
return self.parse_fields(*args, **kwargs)
def get_nets_radb(self, response, is_http=False):
"""
The function for parsing network blocks from ASN origin data.
Args:
response (:obj:`str`): The response from the RADB whois/http
server.
is_http (:obj:`bool`): If the query is RADB HTTP instead of whois,
set to True. Defaults to False.
Returns:
list: A list of network block dictionaries
::
[{
'cidr' (str) - The assigned CIDR
'start' (int) - The index for the start of the parsed
network block
'end' (int) - The index for the end of the parsed network
block
}]
"""
nets = []
if is_http:
regex = r'route(?:6)?:[^\S\n]+(?P<val>.+?)<br>'
else:
regex = r'^route(?:6)?:[^\S\n]+(?P<val>.+|.+)$'
# Iterate through all of the networks found, storing the CIDR value
# and the start and end positions.
for match in re.finditer(
regex,
response,
re.MULTILINE
):
try:
net = copy.deepcopy(BASE_NET)
net['cidr'] = match.group(1).strip()
net['start'] = match.start()
net['end'] = match.end()
nets.append(net)
except ValueError: # pragma: no cover
pass
return nets
def _get_nets_radb(self, *args, **kwargs):
"""
Deprecated. This will be removed in a future release.
"""
from warnings import warn
warn('ASNOrigin._get_nets_radb() has been deprecated and will be '
'removed. You should now use ASNOrigin.get_nets_radb().')
return self.get_nets_radb(*args, **kwargs)
def lookup(self, asn=None, inc_raw=False, retry_count=3, response=None,
field_list=None, asn_alts=None, asn_methods=None):
"""
The function for retrieving and parsing ASN origin whois information
via port 43/tcp (WHOIS).
Args:
asn (:obj:`str`): The ASN (required).
inc_raw (:obj:`bool`): Whether to include the raw results in the
returned dictionary. Defaults to False.
retry_count (:obj:`int`): The number of times to retry in case
socket errors, timeouts, connection resets, etc. are
encountered. Defaults to 3.
response (:obj:`str`): Optional response object, this bypasses the
Whois lookup. Defaults to None.
field_list (:obj:`list`): If provided, fields to parse:
['description', 'maintainer', 'updated', 'source']
If None, defaults to all.
asn_alts (:obj:`list`): Additional lookup types to attempt if the
ASN whois lookup fails. If None, defaults to all ['http'].
*WARNING* deprecated in favor of new argument asn_methods.
asn_methods (:obj:`list`): ASN lookup types to attempt, in order.
If None, defaults to all ['whois', 'http'].
Returns:
dict: The ASN origin lookup results
::
{
'query' (str) - The Autonomous System Number
'nets' (list) - Dictionaries containing network
information which consists of the fields listed in the
ASN_ORIGIN_WHOIS dictionary.
'raw' (str) - Raw ASN origin whois results if the inc_raw
parameter is True.
}
Raises:
ValueError: methods argument requires one of whois, http.
ASNOriginLookupError: ASN origin lookup failed.
"""
if asn[0:2] != 'AS':
asn = 'AS{0}'.format(asn)
if asn_methods is None:
if asn_alts is None:
lookups = ['whois', 'http']
else:
from warnings import warn
warn('ASNOrigin.lookup() asn_alts argument has been deprecated'
' and will be removed. You should now use the asn_methods'
' argument.')
lookups = ['whois'] + asn_alts
else:
if {'whois', 'http'}.isdisjoint(asn_methods):
raise ValueError('methods argument requires at least one of '
'whois, http.')
lookups = asn_methods
# Create the return dictionary.
results = {
'query': asn,
'nets': [],
'raw': None
}
is_http = False
# Only fetch the response if we haven't already.
if response is None:
for index, lookup_method in enumerate(lookups):
if lookup_method == 'whois':
try:
log.debug('Response not given, perform ASN origin '
'WHOIS lookup for {0}'.format(asn))
# Retrieve the whois data.
response = self._net.get_asn_origin_whois(
asn=asn, retry_count=retry_count
)
except (WhoisLookupError, WhoisRateLimitError) as e:
log.debug('ASN origin WHOIS lookup failed: {0}'
''.format(e))
pass
elif lookup_method == 'http':
try:
log.debug('Response not given, perform ASN origin '
'HTTP lookup for: {0}'.format(asn))
tmp = ASN_ORIGIN_HTTP['radb']['form_data']
tmp[str(ASN_ORIGIN_HTTP['radb']['form_data_asn_field']
)] = asn
response = self._net.get_http_raw(
url=ASN_ORIGIN_HTTP['radb']['url'],
retry_count=retry_count,
request_type='POST',
form_data=tmp
)
is_http = True # pragma: no cover
except HTTPLookupError as e:
log.debug('ASN origin HTTP lookup failed: {0}'
''.format(e))
pass
if response is None:
raise ASNOriginLookupError('ASN origin lookup failed with no '
'more methods to try.')
# If inc_raw parameter is True, add the response to return dictionary.
if inc_raw:
results['raw'] = response
nets = []
nets_response = self.get_nets_radb(response, is_http)
nets.extend(nets_response)
if is_http: # pragma: no cover
fields = ASN_ORIGIN_HTTP
else:
fields = ASN_ORIGIN_WHOIS
# Iterate through all of the network sections and parse out the
# appropriate fields for each.
log.debug('Parsing ASN origin data')
for index, net in enumerate(nets):
section_end = None
if index + 1 < len(nets):
section_end = nets[index + 1]['start']
temp_net = self.parse_fields(
response,
fields['radb']['fields'],
section_end,
net['end'],
field_list
)
# Merge the net dictionaries.
net.update(temp_net)
# The start and end values are no longer needed.
del net['start'], net['end']
# Add the networks to the return dictionary.
results['nets'] = nets
return results

View file

@ -1,4 +1,4 @@
# Copyright (c) 2013, 2014, 2015, 2016 Philip Hane
# Copyright (c) 2013-2019 Philip Hane
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@ -23,64 +23,89 @@
# POSSIBILITY OF SUCH DAMAGE.
class NetError(Exception):
class BaseIpwhoisException(Exception):
"""
Base exception for all the ipwhois custom ones.
"""
class NetError(BaseIpwhoisException):
"""
An Exception for when a parameter provided is not an instance of
ipwhois.net.Net.
"""
class IPDefinedError(Exception):
class IPDefinedError(BaseIpwhoisException):
"""
An Exception for when the IP is defined (does not need to be resolved).
"""
class ASNLookupError(Exception):
class ASNLookupError(BaseIpwhoisException):
"""
An Exception for when the ASN lookup failed.
"""
class ASNRegistryError(Exception):
class ASNRegistryError(BaseIpwhoisException):
"""
An Exception for when the ASN registry does not match one of the five
expected values (arin, ripencc, apnic, lacnic, afrinic).
"""
class HostLookupError(Exception):
class ASNParseError(BaseIpwhoisException):
"""
An Exception for when the ASN parsing failed.
"""
class ASNOriginLookupError(BaseIpwhoisException):
"""
An Exception for when the ASN origin lookup failed.
"""
class HostLookupError(BaseIpwhoisException):
"""
An Exception for when the host lookup failed.
"""
class BlacklistError(Exception):
class BlacklistError(BaseIpwhoisException):
"""
An Exception for when the server is in a blacklist.
"""
class WhoisLookupError(Exception):
class WhoisLookupError(BaseIpwhoisException):
"""
An Exception for when the whois lookup failed.
"""
class HTTPLookupError(Exception):
class WhoisRateLimitError(BaseIpwhoisException):
"""
An Exception for when Whois queries exceed the NIC's request limit and have
exhausted all retries.
"""
class HTTPLookupError(BaseIpwhoisException):
"""
An Exception for when the RDAP lookup failed.
"""
class HTTPRateLimitError(Exception):
class HTTPRateLimitError(BaseIpwhoisException):
"""
An Exception for when HTTP queries exceed the NIC's request limit and have
exhausted all retries.
"""
class InvalidEntityContactObject(Exception):
class InvalidEntityContactObject(BaseIpwhoisException):
"""
An Exception for when JSON output is not an RDAP entity contact information
object:
@ -88,14 +113,14 @@ class InvalidEntityContactObject(Exception):
"""
class InvalidNetworkObject(Exception):
class InvalidNetworkObject(BaseIpwhoisException):
"""
An Exception for when JSON output is not an RDAP network object:
https://tools.ietf.org/html/rfc7483#section-5.4
"""
class InvalidEntityObject(Exception):
class InvalidEntityObject(BaseIpwhoisException):
"""
An Exception for when JSON output is not an RDAP entity object:
https://tools.ietf.org/html/rfc7483#section-5.1

457
lib/ipwhois/experimental.py Normal file
View file

@ -0,0 +1,457 @@
# Copyright (c) 2017-2019 Philip Hane
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import socket
import logging
import time
from collections import namedtuple
from .exceptions import (ASNLookupError, HTTPLookupError, HTTPRateLimitError,
ASNRegistryError)
from .asn import IPASN
from .net import (CYMRU_WHOIS, Net)
from .rdap import RDAP
from .utils import unique_everseen
log = logging.getLogger(__name__)
def get_bulk_asn_whois(addresses=None, retry_count=3, timeout=120):
"""
The function for retrieving ASN information for multiple IP addresses from
Cymru via port 43/tcp (WHOIS).
Args:
addresses (:obj:`list` of :obj:`str`): IP addresses to lookup.
retry_count (:obj:`int`): The number of times to retry in case socket
errors, timeouts, connection resets, etc. are encountered.
Defaults to 3.
timeout (:obj:`int`): The default timeout for socket connections in
seconds. Defaults to 120.
Returns:
str: The raw ASN bulk data, new line separated.
Raises:
ValueError: addresses argument must be a list of IPv4/v6 address
strings.
ASNLookupError: The ASN bulk lookup failed.
"""
if not isinstance(addresses, list):
raise ValueError('addresses argument must be a list of IPv4/v6 '
'address strings.')
try:
# Create the connection for the Cymru whois query.
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.settimeout(timeout)
log.debug('ASN bulk query initiated.')
conn.connect((CYMRU_WHOIS, 43))
# Query the Cymru whois server, and store the results.
conn.sendall((
' -r -a -c -p -f begin\n{0}\nend'.format(
'\n'.join(addresses))
).encode())
data = ''
while True:
d = conn.recv(4096).decode()
data += d
if not d:
break
conn.close()
return str(data)
except (socket.timeout, socket.error) as e: # pragma: no cover
log.debug('ASN bulk query socket error: {0}'.format(e))
if retry_count > 0:
log.debug('ASN bulk query retrying (count: {0})'.format(
str(retry_count)))
return get_bulk_asn_whois(addresses, retry_count - 1, timeout)
else:
raise ASNLookupError('ASN bulk lookup failed.')
except: # pragma: no cover
raise ASNLookupError('ASN bulk lookup failed.')
def bulk_lookup_rdap(addresses=None, inc_raw=False, retry_count=3, depth=0,
excluded_entities=None, rate_limit_timeout=60,
socket_timeout=10, asn_timeout=240, proxy_openers=None):
"""
The function for bulk retrieving and parsing whois information for a list
of IP addresses via HTTP (RDAP). This bulk lookup method uses bulk
ASN Whois lookups first to retrieve the ASN for each IP. It then optimizes
RDAP queries to achieve the fastest overall time, accounting for
rate-limiting RIRs.
Args:
addresses (:obj:`list` of :obj:`str`): IP addresses to lookup.
inc_raw (:obj:`bool`, optional): Whether to include the raw whois
results in the returned dictionary. Defaults to False.
retry_count (:obj:`int`): The number of times to retry in case socket
errors, timeouts, connection resets, etc. are encountered.
Defaults to 3.
depth (:obj:`int`): How many levels deep to run queries when additional
referenced objects are found. Defaults to 0.
excluded_entities (:obj:`list` of :obj:`str`): Entity handles to not
perform lookups. Defaults to None.
rate_limit_timeout (:obj:`int`): The number of seconds to wait before
retrying when a rate limit notice is returned via rdap+json.
Defaults to 60.
socket_timeout (:obj:`int`): The default timeout for socket
connections in seconds. Defaults to 10.
asn_timeout (:obj:`int`): The default timeout for bulk ASN lookups in
seconds. Defaults to 240.
proxy_openers (:obj:`list` of :obj:`OpenerDirector`): Proxy openers
for single/rotating proxy support. Defaults to None.
Returns:
namedtuple:
:results (dict): IP address keys with the values as dictionaries
returned by IPWhois.lookup_rdap().
:stats (dict): Stats for the lookups:
::
{
'ip_input_total' (int) - The total number of addresses
originally provided for lookup via the addresses argument.
'ip_unique_total' (int) - The total number of unique addresses
found in the addresses argument.
'ip_lookup_total' (int) - The total number of addresses that
lookups were attempted for, excluding any that failed ASN
registry checks.
'lacnic' (dict) -
{
'failed' (list) - The addresses that failed to lookup.
Excludes any that failed initially, but succeeded after
futher retries.
'rate_limited' (list) - The addresses that encountered
rate-limiting. Unless an address is also in 'failed',
it eventually succeeded.
'total' (int) - The total number of addresses belonging to
this RIR that lookups were attempted for.
}
'ripencc' (dict) - Same as 'lacnic' above.
'apnic' (dict) - Same as 'lacnic' above.
'afrinic' (dict) - Same as 'lacnic' above.
'arin' (dict) - Same as 'lacnic' above.
'unallocated_addresses' (list) - The addresses that are
unallocated/failed ASN lookups. These can be addresses that
are not listed for one of the 5 RIRs (other). No attempt
was made to perform an RDAP lookup for these.
}
Raises:
ASNLookupError: The ASN bulk lookup failed, cannot proceed with bulk
RDAP lookup.
"""
if not isinstance(addresses, list):
raise ValueError('addresses must be a list of IP address strings')
# Initialize the dicts/lists
results = {}
failed_lookups_dict = {}
rated_lookups = []
stats = {
'ip_input_total': len(addresses),
'ip_unique_total': 0,
'ip_lookup_total': 0,
'lacnic': {'failed': [], 'rate_limited': [], 'total': 0},
'ripencc': {'failed': [], 'rate_limited': [], 'total': 0},
'apnic': {'failed': [], 'rate_limited': [], 'total': 0},
'afrinic': {'failed': [], 'rate_limited': [], 'total': 0},
'arin': {'failed': [], 'rate_limited': [], 'total': 0},
'unallocated_addresses': []
}
asn_parsed_results = {}
if proxy_openers is None:
proxy_openers = [None]
proxy_openers_copy = iter(proxy_openers)
# Make sure addresses is unique
unique_ip_list = list(unique_everseen(addresses))
# Get the unique count to return
stats['ip_unique_total'] = len(unique_ip_list)
# This is needed for iteration order
rir_keys_ordered = ['lacnic', 'ripencc', 'apnic', 'afrinic', 'arin']
# First query the ASN data for all IPs, can raise ASNLookupError, no catch
bulk_asn = get_bulk_asn_whois(unique_ip_list, timeout=asn_timeout)
# ASN results are returned as string, parse lines to list and remove first
asn_result_list = bulk_asn.split('\n')
del asn_result_list[0]
# We need to instantiate IPASN, which currently needs a Net object,
# IP doesn't matter here
net = Net('1.2.3.4')
ipasn = IPASN(net)
# Iterate each IP ASN result, and add valid RIR results to
# asn_parsed_results for RDAP lookups
for asn_result in asn_result_list:
temp = asn_result.split('|')
# Not a valid entry, move on to next
if len(temp) == 1:
continue
ip = temp[1].strip()
# We need this since ASN bulk lookup is returning duplicates
# This is an issue on the Cymru end
if ip in asn_parsed_results.keys(): # pragma: no cover
continue
try:
results = ipasn.parse_fields_whois(asn_result)
except ASNRegistryError: # pragma: no cover
continue
# Add valid IP ASN result to asn_parsed_results for RDAP lookup
asn_parsed_results[ip] = results
stats[results['asn_registry']]['total'] += 1
# Set the list of IPs that are not allocated/failed ASN lookup
stats['unallocated_addresses'] = list(k for k in addresses if k not in
asn_parsed_results)
# Set the total lookup count after unique IP and ASN result filtering
stats['ip_lookup_total'] = len(asn_parsed_results)
# Track the total number of LACNIC queries left. This is tracked in order
# to ensure the 9 priority LACNIC queries/min don't go into infinite loop
lacnic_total_left = stats['lacnic']['total']
# Set the start time, this value is updated when the rate limit is reset
old_time = time.time()
# Rate limit tracking dict for all RIRs
rate_tracker = {
'lacnic': {'time': old_time, 'count': 0},
'ripencc': {'time': old_time, 'count': 0},
'apnic': {'time': old_time, 'count': 0},
'afrinic': {'time': old_time, 'count': 0},
'arin': {'time': old_time, 'count': 0}
}
# Iterate all of the IPs to perform RDAP lookups until none are left
while len(asn_parsed_results) > 0:
# Sequentially run through each RIR to minimize lookups in a row to
# the same RIR.
for rir in rir_keys_ordered:
# If there are still LACNIC IPs left to lookup and the rate limit
# hasn't been reached, skip to find a LACNIC IP to lookup
if (
rir != 'lacnic' and lacnic_total_left > 0 and
(rate_tracker['lacnic']['count'] != 9 or
(time.time() - rate_tracker['lacnic']['time']
) >= rate_limit_timeout
)
): # pragma: no cover
continue
# If the RIR rate limit has been reached and hasn't expired,
# move on to the next RIR
if (
rate_tracker[rir]['count'] == 9 and (
(time.time() - rate_tracker[rir]['time']
) < rate_limit_timeout)
): # pragma: no cover
continue
# If the RIR rate limit has expired, reset the count/timer
# and perform the lookup
elif ((time.time() - rate_tracker[rir]['time']
) >= rate_limit_timeout): # pragma: no cover
rate_tracker[rir]['count'] = 0
rate_tracker[rir]['time'] = time.time()
# Create a copy of the lookup IP dict so we can modify on
# successful/failed queries. Loop each IP until it matches the
# correct RIR in the parent loop, and attempt lookup
tmp_dict = asn_parsed_results.copy()
for ip, asn_data in tmp_dict.items():
# Check to see if IP matches parent loop RIR for lookup
if asn_data['asn_registry'] == rir:
log.debug('Starting lookup for IP: {0} '
'RIR: {1}'.format(ip, rir))
# Add to count for rate-limit tracking only for LACNIC,
# since we have not seen aggressive rate-limiting from the
# other RIRs yet
if rir == 'lacnic':
rate_tracker[rir]['count'] += 1
# Get the next proxy opener to use, or None
try:
opener = next(proxy_openers_copy)
# Start at the beginning if all have been used
except StopIteration:
proxy_openers_copy = iter(proxy_openers)
opener = next(proxy_openers_copy)
# Instantiate the objects needed for the RDAP lookup
net = Net(ip, timeout=socket_timeout, proxy_opener=opener)
rdap = RDAP(net)
try:
# Perform the RDAP lookup. retry_count is set to 0
# here since we handle that in this function
results = rdap.lookup(
inc_raw=inc_raw, retry_count=0, asn_data=asn_data,
depth=depth, excluded_entities=excluded_entities
)
log.debug('Successful lookup for IP: {0} '
'RIR: {1}'.format(ip, rir))
# Lookup was successful, add to result. Set the nir
# key to None as this is not supported
# (yet - requires more queries)
results[ip] = results
results[ip]['nir'] = None
# Remove the IP from the lookup queue
del asn_parsed_results[ip]
# If this was LACNIC IP, reduce the total left count
if rir == 'lacnic':
lacnic_total_left -= 1
log.debug(
'{0} total lookups left, {1} LACNIC lookups left'
''.format(str(len(asn_parsed_results)),
str(lacnic_total_left))
)
# If this IP failed previously, remove it from the
# failed return dict
if (
ip in failed_lookups_dict.keys()
): # pragma: no cover
del failed_lookups_dict[ip]
# Break out of the IP list loop, we need to change to
# the next RIR
break
except HTTPLookupError: # pragma: no cover
log.debug('Failed lookup for IP: {0} '
'RIR: {1}'.format(ip, rir))
# Add the IP to the failed lookups dict if not there
if ip not in failed_lookups_dict.keys():
failed_lookups_dict[ip] = 1
# This IP has already failed at least once, increment
# the failure count until retry_count reached, then
# stop trying
else:
failed_lookups_dict[ip] += 1
if failed_lookups_dict[ip] == retry_count:
del asn_parsed_results[ip]
stats[rir]['failed'].append(ip)
if rir == 'lacnic':
lacnic_total_left -= 1
# Since this IP failed, we don't break to move to next
# RIR, we check the next IP for this RIR
continue
except HTTPRateLimitError: # pragma: no cover
# Add the IP to the rate-limited lookups dict if not
# there
if ip not in rated_lookups:
rated_lookups.append(ip)
stats[rir]['rate_limited'].append(ip)
log.debug('Rate limiting triggered for IP: {0} '
'RIR: {1}'.format(ip, rir))
# Since rate-limit was reached, reset the timer and
# max out the count
rate_tracker[rir]['time'] = time.time()
rate_tracker[rir]['count'] = 9
# Break out of the IP list loop, we need to change to
# the next RIR
break
return_tuple = namedtuple('return_tuple', ['results', 'stats'])
return return_tuple(results, stats)

View file

@ -1,4 +1,4 @@
# Copyright (c) 2013, 2014, 2015, 2016 Philip Hane
# Copyright (c) 2013-2019 Philip Hane
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@ -50,6 +50,45 @@ HR_ASN = {
'_short': 'ASN Registry',
'_name': 'ASN Assigned Registry',
'_description': 'ASN assigned regional internet registry.'
},
'asn_description': {
'_short': 'ASN Description',
'_name': 'ASN Description',
'_description': 'A brief description for the assigned ASN.'
}
}
HR_ASN_ORIGIN = {
'nets': {
'_short': 'Network',
'_name': 'ASN Network',
'_description': 'A network associated with an Autonomous System Number'
' (ASN)',
'cidr': {
'_short': 'CIDR',
'_name': 'Classless Inter-Domain Routing Block',
'_description': 'The network routing block.'
},
'description': {
'_short': 'Description',
'_name': 'Description',
'_description': 'Description for the registered network.'
},
'maintainer': {
'_short': 'Maintainer',
'_name': 'Maintainer',
'_description': 'The entity that maintains the network.'
},
'updated': {
'_short': 'Updated',
'_name': 'Updated Timestamp',
'_description': 'Network registration updated information.'
},
'source': {
'_short': 'Source',
'_name': 'ASN Network Information Source',
'_description': 'The source of the network information.'
}
}
}
@ -353,3 +392,118 @@ HR_WHOIS = {
'_description': 'The referral whois data if referenced and enabled.',
}
}
HR_WHOIS_NIR = {
'nets': {
'_short': 'NIR Network',
'_name': 'National Internet Registry Network',
'_description': 'The assigned NIR (JPNIC, KRNIC) network for an IP '
'address. May be a parent or child network.',
'address': {
'_short': 'Address',
'_name': 'Postal Address',
'_description': 'The network contact postal address.'
},
'cidr': {
'_short': 'CIDR Blocks',
'_name': 'Classless Inter-Domain Routing Blocks',
'_description': 'Network routing blocks an IP address belongs to.'
},
'country': {
'_short': 'Country Code',
'_name': 'Country Code',
'_description': 'Country code registered for the network in '
'ISO 3166-1 format.'
},
'handle': {
'_short': 'Handle',
'_name': 'NIR Network Handle',
'_description': 'Unique identifier for a registered NIR network.'
},
'name': {
'_short': 'Name',
'_name': 'NIR Network Name',
'_description': 'The identifier assigned to the network '
'registration for an IP address.'
},
'postal_code': {
'_short': 'Postal',
'_name': 'Postal Code',
'_description': 'The postal code registered with a NIR network.'
},
'range': {
'_short': 'Ranges',
'_name': 'CIDR Block Ranges',
'_description': 'Network routing blocks an IP address belongs to.'
},
'nameservers': {
'_short': 'NS',
'_name': 'Nameservers',
'_description': 'Nameservers associated with a NIR network.'
},
'created': {
'_short': 'Created',
'_name': 'Created Timestamp',
'_description': 'The date the network was created in ISO 8601 '
'format.'
},
'updated': {
'_short': 'Updated',
'_name': 'Updated Timestamp',
'_description': 'The date the network was updated in ISO 8601 '
'format.'
},
'contacts': {
'_short': 'Contacts',
'_name': 'NIR Contacts',
'_description': 'The contacts (admin, tech) registered with a NIR '
'network.',
'organization': {
'_short': 'Org',
'_name': 'Organization',
'_description': 'The contact organization.'
},
'division': {
'_short': 'Div',
'_name': 'Division',
'_description': 'The contact division of the organization.'
},
'name': {
'_short': 'Name',
'_name': 'Name',
'_description': 'The contact name.'
},
'title': {
'_short': 'Title',
'_name': 'Title',
'_description': 'The contact position or job title.'
},
'phone': {
'_short': 'Phone',
'_name': 'Phone Number',
'_description': 'The contact phone number.'
},
'fax': {
'_short': 'Fax',
'_name': 'Fax Number',
'_description': 'The contact fax number.'
},
'email': {
'_short': 'Email',
'_name': 'Email Address',
'_description': 'The contact email address.'
},
'reply_email': {
'_short': 'Reply Email',
'_name': 'Reply Email Address',
'_description': 'The contact reply email address.'
},
'updated': {
'_short': 'Updated',
'_name': 'Updated Timestamp',
'_description': 'The date the contact was updated in ISO 8601 '
'format.'
}
}
}
}

View file

@ -1,4 +1,4 @@
# Copyright (c) 2013, 2014, 2015, 2016 Philip Hane
# Copyright (c) 2013-2019 Philip Hane
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@ -23,6 +23,8 @@
# POSSIBILITY OF SUCH DAMAGE.
from . import Net
from .asn import IPASN
from .nir import NIRWhois
import logging
log = logging.getLogger(__name__)
@ -34,22 +36,25 @@ class IPWhois:
IPv4 and IPv6 addresses.
Args:
address: An IPv4 or IPv6 address as a string, integer, IPv4Address, or
IPv6Address.
timeout: The default timeout for socket connections in seconds.
proxy_opener: The urllib.request.OpenerDirector request for proxy
support or None.
allow_permutations: allow net.Net() to use additional methods if DNS
lookups to Cymru fail.
address (:obj:`str`/:obj:`int`/:obj:`IPv4Address`/:obj:`IPv6Address`):
An IPv4 or IPv6 address
timeout (:obj:`int`): The default timeout for socket connections in
seconds. Defaults to 5.
proxy_opener (:obj:`urllib.request.OpenerDirector`): The request for
proxy support. Defaults to None.
allow_permutations (:obj:`bool`): Allow net.Net() to use additional
methods if DNS lookups to Cymru fail. *WARNING* deprecated in
favor of new argument asn_methods. Defaults to False.
"""
def __init__(self, address, timeout=5, proxy_opener=None,
allow_permutations=True):
allow_permutations=False):
self.net = Net(
address=address, timeout=timeout, proxy_opener=proxy_opener,
allow_permutations=allow_permutations
)
self.ipasn = IPASN(self.net)
self.address = self.net.address
self.timeout = self.net.timeout
@ -64,80 +69,101 @@ class IPWhois:
self.address_str, str(self.timeout), repr(self.net.opener)
)
def lookup(self, *args, **kwargs):
"""
Temporary wrapper for legacy whois lookups (moved to
IPWhois.lookup_whois()). This will be removed in a future
release (TBD).
"""
from warnings import warn
warn("IPWhois.lookup() has been deprecated and will be removed. "
"You should now use IPWhois.lookup_whois() for legacy whois "
"lookups.")
return self.lookup_whois(*args, **kwargs)
def lookup_whois(self, inc_raw=False, retry_count=3, get_referral=False,
extra_blacklist=None, ignore_referral_errors=False,
field_list=None, asn_alts=None, extra_org_map=None):
field_list=None, asn_alts=None, extra_org_map=None,
inc_nir=True, nir_field_list=None, asn_methods=None,
get_asn_description=True):
"""
The function for retrieving and parsing whois information for an IP
address via port 43 (WHOIS).
Args:
inc_raw: Boolean for whether to include the raw whois results in
the returned dictionary.
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
get_referral: Boolean for whether to retrieve referral whois
information, if available.
extra_blacklist: A list of blacklisted whois servers in addition to
the global BLACKLIST.
ignore_referral_errors: Boolean for whether to ignore and continue
when an exception is encountered on referral whois lookups.
field_list: If provided, a list of fields to parse:
inc_raw (:obj:`bool`): Whether to include the raw whois results in
the returned dictionary. Defaults to False.
retry_count (:obj:`int`): The number of times to retry in case
socket errors, timeouts, connection resets, etc. are
encountered. Defaults to 3.
get_referral (:obj:`bool`): Whether to retrieve referral whois
information, if available. Defaults to False.
extra_blacklist (:obj:`list`): Blacklisted whois servers in
addition to the global BLACKLIST. Defaults to None.
ignore_referral_errors (:obj:`bool`): Whether to ignore and
continue when an exception is encountered on referral whois
lookups. Defaults to False.
field_list (:obj:`list`): If provided, a list of fields to parse:
['name', 'handle', 'description', 'country', 'state', 'city',
'address', 'postal_code', 'emails', 'created', 'updated']
asn_alts: Array of additional lookup types to attempt if the
If None, defaults to all.
asn_alts (:obj:`list`): Additional lookup types to attempt if the
ASN dns lookup fails. Allow permutations must be enabled.
Defaults to all ['whois', 'http'].
extra_org_map: Dictionary mapping org handles to RIRs. This is for
limited cases where ARIN REST (ASN fallback HTTP lookup) does
not show an RIR as the org handle e.g., DNIC (which is now the
built in ORG_MAP) e.g., {'DNIC': 'arin'}. Valid RIR values are
(note the case-sensitive - this is meant to match the REST
result): 'ARIN', 'RIPE', 'apnic', 'lacnic', 'afrinic'
If None, defaults to all ['whois', 'http']. *WARNING*
deprecated in favor of new argument asn_methods.
extra_org_map (:obj:`dict`): Dictionary mapping org handles to
RIRs. This is for limited cases where ARIN REST (ASN fallback
HTTP lookup) does not show an RIR as the org handle e.g., DNIC
(which is now the built in ORG_MAP) e.g., {'DNIC': 'arin'}.
Valid RIR values are (note the case-sensitive - this is meant
to match the REST result):
'ARIN', 'RIPE', 'apnic', 'lacnic', 'afrinic'
Defaults to None.
inc_nir (:obj:`bool`): Whether to retrieve NIR (National Internet
Registry) information, if registry is JPNIC (Japan) or KRNIC
(Korea). If True, extra network requests will be required.
If False, the information returned for JP or KR IPs is
severely restricted. Defaults to True.
nir_field_list (:obj:`list`): If provided and inc_nir, a list of
fields to parse:
['name', 'handle', 'country', 'address', 'postal_code',
'nameservers', 'created', 'updated', 'contacts']
If None, defaults to all.
asn_methods (:obj:`list`): ASN lookup types to attempt, in order.
If None, defaults to all ['dns', 'whois', 'http'].
get_asn_description (:obj:`bool`): Whether to run an additional
query when pulling ASN information via dns, in order to get
the ASN description. Defaults to True.
Returns:
Dictionary:
dict: The IP whois lookup results
:query: The IP address (String)
:asn: The Autonomous System Number (String)
:asn_date: The ASN Allocation date (String)
:asn_registry: The assigned ASN registry (String)
:asn_cidr: The assigned ASN CIDR (String)
:asn_country_code: The assigned ASN country code (String)
:nets: Dictionaries containing network information which consists
of the fields listed in the ipwhois.whois.RIR_WHOIS dictionary.
(List)
:raw: Raw whois results if the inc_raw parameter is True. (String)
:referral: Dictionary of referral whois information if get_referral
is True and the server isn't blacklisted. Consists of fields
listed in the ipwhois.whois.RWHOIS dictionary.
:raw_referral: Raw referral whois results if the inc_raw parameter
is True. (String)
::
{
'query' (str) - The IP address
'asn' (str) - The Autonomous System Number
'asn_date' (str) - The ASN Allocation date
'asn_registry' (str) - The assigned ASN registry
'asn_cidr' (str) - The assigned ASN CIDR
'asn_country_code' (str) - The assigned ASN country code
'asn_description' (str) - The ASN description
'nets' (list) - Dictionaries containing network
information which consists of the fields listed in the
ipwhois.whois.RIR_WHOIS dictionary.
'raw' (str) - Raw whois results if the inc_raw parameter
is True.
'referral' (dict) - Referral whois information if
get_referral is True and the server is not blacklisted.
Consists of fields listed in the ipwhois.whois.RWHOIS
dictionary.
'raw_referral' (str) - Raw referral whois results if the
inc_raw parameter is True.
'nir' (dict) - ipwhois.nir.NIRWhois() results if inc_nir
is True.
}
"""
from .whois import Whois
# Create the return dictionary.
results = {}
results = {'nir': None}
# Retrieve the ASN information.
log.debug('ASN lookup for {0}'.format(self.address_str))
asn_data, response = self.net.lookup_asn(
retry_count=retry_count, asn_alts=asn_alts,
extra_org_map=extra_org_map
asn_data = self.ipasn.lookup(
inc_raw=inc_raw, retry_count=retry_count, asn_alts=asn_alts,
extra_org_map=extra_org_map, asn_methods=asn_methods,
get_asn_description=get_asn_description
)
# Add the ASN information to the return dictionary.
@ -147,20 +173,42 @@ class IPWhois:
whois = Whois(self.net)
log.debug('WHOIS lookup for {0}'.format(self.address_str))
whois_data = whois.lookup(
inc_raw=inc_raw, retry_count=retry_count, response=response,
inc_raw=inc_raw, retry_count=retry_count, response=None,
get_referral=get_referral, extra_blacklist=extra_blacklist,
ignore_referral_errors=ignore_referral_errors, asn_data=asn_data,
field_list=field_list
)
# Add the RDAP information to the return dictionary.
# Add the WHOIS information to the return dictionary.
results.update(whois_data)
if inc_nir:
nir = None
if 'JP' == asn_data['asn_country_code']:
nir = 'jpnic'
elif 'KR' == asn_data['asn_country_code']:
nir = 'krnic'
if nir:
nir_whois = NIRWhois(self.net)
nir_data = nir_whois.lookup(
nir=nir, inc_raw=inc_raw, retry_count=retry_count,
response=None,
field_list=nir_field_list, is_offline=False
)
# Add the NIR information to the return dictionary.
results['nir'] = nir_data
return results
def lookup_rdap(self, inc_raw=False, retry_count=3, depth=0,
excluded_entities=None, bootstrap=False,
rate_limit_timeout=120, asn_alts=None, extra_org_map=None):
rate_limit_timeout=120, asn_alts=None, extra_org_map=None,
inc_nir=True, nir_field_list=None, asn_methods=None,
get_asn_description=True):
"""
The function for retrieving and parsing whois information for an IP
address via HTTP (RDAP).
@ -169,50 +217,84 @@ class IPWhois:
information to parse.**
Args:
inc_raw: Boolean for whether to include the raw whois results in
the returned dictionary.
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
depth: How many levels deep to run queries when additional
referenced objects are found.
excluded_entities: A list of entity handles to not perform lookups.
bootstrap: If True, performs lookups via ARIN bootstrap rather
than lookups based on ASN data. ASN lookups are not performed
and no output for any of the asn* fields is provided.
rate_limit_timeout: The number of seconds to wait before retrying
when a rate limit notice is returned via rdap+json.
asn_alts: Array of additional lookup types to attempt if the
inc_raw (:obj:`bool`): Whether to include the raw whois results in
the returned dictionary. Defaults to False.
retry_count (:obj:`int`): The number of times to retry in case
socket errors, timeouts, connection resets, etc. are
encountered. Defaults to 3.
depth (:obj:`int`): How many levels deep to run queries when
additional referenced objects are found. Defaults to 0.
excluded_entities (:obj:`list`): Entity handles to not perform
lookups. Defaults to None.
bootstrap (:obj:`bool`): If True, performs lookups via ARIN
bootstrap rather than lookups based on ASN data. ASN lookups
are not performed and no output for any of the asn* fields is
provided. Defaults to False.
rate_limit_timeout (:obj:`int`): The number of seconds to wait
before retrying when a rate limit notice is returned via
rdap+json. Defaults to 120.
asn_alts (:obj:`list`): Additional lookup types to attempt if the
ASN dns lookup fails. Allow permutations must be enabled.
Defaults to all ['whois', 'http'].
extra_org_map: Dictionary mapping org handles to RIRs. This is for
limited cases where ARIN REST (ASN fallback HTTP lookup) does
not show an RIR as the org handle e.g., DNIC (which is now the
built in ORG_MAP) e.g., {'DNIC': 'arin'}. Valid RIR values are
(note the case-sensitive - this is meant to match the REST
result): 'ARIN', 'RIPE', 'apnic', 'lacnic', 'afrinic'
If None, defaults to all ['whois', 'http']. *WARNING*
deprecated in favor of new argument asn_methods.
extra_org_map (:obj:`dict`): Dictionary mapping org handles to
RIRs. This is for limited cases where ARIN REST (ASN fallback
HTTP lookup) does not show an RIR as the org handle e.g., DNIC
(which is now the built in ORG_MAP) e.g., {'DNIC': 'arin'}.
Valid RIR values are (note the case-sensitive - this is meant
to match the REST result):
'ARIN', 'RIPE', 'apnic', 'lacnic', 'afrinic'
Defaults to None.
inc_nir (:obj:`bool`): Whether to retrieve NIR (National Internet
Registry) information, if registry is JPNIC (Japan) or KRNIC
(Korea). If True, extra network requests will be required.
If False, the information returned for JP or KR IPs is
severely restricted. Defaults to True.
nir_field_list (:obj:`list`): If provided and inc_nir, a list of
fields to parse:
['name', 'handle', 'country', 'address', 'postal_code',
'nameservers', 'created', 'updated', 'contacts']
If None, defaults to all.
asn_methods (:obj:`list`): ASN lookup types to attempt, in order.
If None, defaults to all ['dns', 'whois', 'http'].
get_asn_description (:obj:`bool`): Whether to run an additional
query when pulling ASN information via dns, in order to get
the ASN description. Defaults to True.
Returns:
Dictionary:
dict: The IP RDAP lookup results
:query: The IP address (String)
:asn: The Autonomous System Number (String)
:asn_date: The ASN Allocation date (String)
:asn_registry: The assigned ASN registry (String)
:asn_cidr: The assigned ASN CIDR (String)
:asn_country_code: The assigned ASN country code (String)
:entities: List of entity handles referred by the top level query.
:network: Dictionary containing network information which consists
of the fields listed in the ipwhois.rdap._RDAPNetwork dict.
:objects: Dictionary of (entity handle: entity dict) which consists
of the fields listed in the ipwhois.rdap._RDAPEntity dict.
:raw: (Dictionary) - Whois results in json format if the inc_raw
parameter is True.
::
{
'query' (str) - The IP address
'asn' (str) - The Autonomous System Number
'asn_date' (str) - The ASN Allocation date
'asn_registry' (str) - The assigned ASN registry
'asn_cidr' (str) - The assigned ASN CIDR
'asn_country_code' (str) - The assigned ASN country code
'asn_description' (str) - The ASN description
'entities' (list) - Entity handles referred by the top
level query.
'network' (dict) - Network information which consists of
the fields listed in the ipwhois.rdap._RDAPNetwork
dict.
'objects' (dict) - Mapping of entity handle->entity dict
which consists of the fields listed in the
ipwhois.rdap._RDAPEntity dict. The raw result is
included for each object if the inc_raw parameter
is True.
'raw' (dict) - Whois results in json format if the inc_raw
parameter is True.
'nir' (dict) - ipwhois.nir.NIRWhois results if inc_nir is
True.
}
"""
from .rdap import RDAP
# Create the return dictionary.
results = {}
results = {'nir': None}
asn_data = None
response = None
@ -220,9 +302,10 @@ class IPWhois:
# Retrieve the ASN information.
log.debug('ASN lookup for {0}'.format(self.address_str))
asn_data, asn_response = self.net.lookup_asn(
retry_count=retry_count, asn_alts=asn_alts,
extra_org_map=extra_org_map
asn_data = self.ipasn.lookup(
inc_raw=inc_raw, retry_count=retry_count, asn_alts=asn_alts,
extra_org_map=extra_org_map, asn_methods=asn_methods,
get_asn_description=get_asn_description
)
# Add the ASN information to the return dictionary.
@ -241,4 +324,23 @@ class IPWhois:
# Add the RDAP information to the return dictionary.
results.update(rdap_data)
if inc_nir:
nir = None
if 'JP' == asn_data['asn_country_code']:
nir = 'jpnic'
elif 'KR' == asn_data['asn_country_code']:
nir = 'krnic'
if nir:
nir_whois = NIRWhois(self.net)
nir_data = nir_whois.lookup(
nir=nir, inc_raw=inc_raw, retry_count=retry_count,
response=None,
field_list=nir_field_list, is_offline=False
)
# Add the NIR information to the return dictionary.
results['nir'] = nir_data
return results

View file

@ -1,4 +1,4 @@
# Copyright (c) 2013, 2014, 2015, 2016 Philip Hane
# Copyright (c) 2013-2019 Philip Hane
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@ -26,46 +26,44 @@ import sys
import socket
import dns.resolver
import json
from collections import namedtuple
import logging
from time import sleep
# Import the dnspython3 rdtypes to fix the dynamic import problem when frozen.
# Import the dnspython rdtypes to fix the dynamic import problem when frozen.
import dns.rdtypes.ANY.TXT # @UnusedImport
from .exceptions import (IPDefinedError, ASNRegistryError, ASNLookupError,
BlacklistError, WhoisLookupError, HTTPLookupError,
HostLookupError, HTTPRateLimitError)
from .exceptions import (IPDefinedError, ASNLookupError, BlacklistError,
WhoisLookupError, HTTPLookupError, HostLookupError,
HTTPRateLimitError, WhoisRateLimitError)
from .whois import RIR_WHOIS
from .asn import ASN_ORIGIN_WHOIS
from .utils import ipv4_is_defined, ipv6_is_defined
if sys.version_info >= (3, 3): # pragma: no cover
from ipaddress import (ip_address,
IPv4Address,
IPv6Address,
ip_network,
summarize_address_range,
collapse_addresses)
IPv6Address)
else: # pragma: no cover
from ipaddr import (IPAddress as ip_address,
IPv4Address,
IPv6Address,
IPNetwork as ip_network,
summarize_address_range,
collapse_address_list as collapse_addresses)
IPv6Address)
try: # pragma: no cover
from urllib.request import (OpenerDirector,
ProxyHandler,
build_opener,
Request,
URLError)
URLError,
HTTPError)
from urllib.parse import urlencode
except ImportError: # pragma: no cover
from urllib2 import (OpenerDirector,
ProxyHandler,
build_opener,
Request,
URLError)
URLError,
HTTPError)
from urllib import urlencode
log = logging.getLogger(__name__)
@ -73,22 +71,6 @@ log = logging.getLogger(__name__)
# POSSIBLY UPDATE TO USE RDAP
ARIN = 'http://whois.arin.net/rest/nets;q={0}?showDetails=true&showARIN=true'
# National Internet Registry
NIR = {
'jpnic': {
'url': ('http://whois.nic.ad.jp/cgi-bin/whois_gw?lang=%2Fe&key={0}'
'&submit=query'),
'request_type': 'GET',
'request_headers': {'Accept': 'text/html'}
},
'krnic': {
'url': 'http://whois.kisa.or.kr/eng/whois.jsc',
'request_type': 'POST',
'request_headers': {'Accept': 'text/html'},
'form_data_ip_field': 'query'
}
}
CYMRU_WHOIS = 'whois.cymru.com'
IPV4_DNS_ZONE = '{0}.origin.asn.cymru.com'
@ -115,12 +97,15 @@ class Net:
The class for performing network queries.
Args:
address: An IPv4 or IPv6 address in string format.
timeout: The default timeout for socket connections in seconds.
proxy_opener: The urllib.request.OpenerDirector request for proxy
support or None.
allow_permutations: Use additional methods if DNS lookups to Cymru
fail.
address (:obj:`str`/:obj:`int`/:obj:`IPv4Address`/:obj:`IPv6Address`):
An IPv4 or IPv6 address
timeout (:obj:`int`): The default timeout for socket connections in
seconds. Defaults to 5.
proxy_opener (:obj:`urllib.request.OpenerDirector`): The request for
proxy support. Defaults to None.
allow_permutations (:obj:`bool`): Allow net.Net() to use additional
methods if DNS lookups to Cymru fail. *WARNING* deprecated in
favor of new argument asn_methods. Defaults to False.
Raises:
IPDefinedError: The address provided is defined (does not need to be
@ -128,7 +113,7 @@ class Net:
"""
def __init__(self, address, timeout=5, proxy_opener=None,
allow_permutations=True):
allow_permutations=False):
# IPv4Address or IPv6Address
if isinstance(address, IPv4Address) or isinstance(
@ -147,6 +132,13 @@ class Net:
# Allow other than DNS lookups for ASNs.
self.allow_permutations = allow_permutations
if self.allow_permutations:
from warnings import warn
warn('allow_permutations has been deprecated and will be removed. '
'It is no longer needed, due to the deprecation of asn_alts, '
'and the addition of the asn_methods argument.')
self.dns_resolver = dns.resolver.Resolver()
self.dns_resolver.timeout = timeout
self.dns_resolver.lifetime = timeout
@ -227,59 +219,38 @@ class Net:
self.dns_zone = IPV6_DNS_ZONE.format(self.reversed)
def get_asn_dns(self, result=None):
def lookup_asn(self, *args, **kwargs):
"""
Temporary wrapper for IP ASN lookups (moved to
asn.IPASN.lookup()). This will be removed in a future
release.
"""
from warnings import warn
warn('Net.lookup_asn() has been deprecated and will be removed. '
'You should now use asn.IPASN.lookup() for IP ASN lookups.')
from .asn import IPASN
response = None
ipasn = IPASN(self)
return ipasn.lookup(*args, **kwargs), response
def get_asn_dns(self):
"""
The function for retrieving ASN information for an IP address from
Cymru via port 53 (DNS).
Args:
result: Optional result object. This bypasses the ASN lookup.
Returns:
Dictionary: A dictionary containing the following keys:
asn (String) - The Autonomous System Number.
asn_date (String) - The ASN Allocation date.
asn_registry (String) - The assigned ASN registry.
asn_cidr (String) - The assigned ASN CIDR.
asn_country_code (String) - The assigned ASN country code.
list: The raw ASN data.
Raises:
ASNRegistryError: The ASN registry is not known.
ASNLookupError: The ASN lookup failed.
"""
try:
if result is None:
log.debug('ASN query for {0}'.format(self.dns_zone))
data = self.dns_resolver.query(self.dns_zone, 'TXT')
temp = str(data[0]).split('|')
else:
temp = result
# Parse out the ASN information.
ret = {'asn_registry': temp[3].strip(' \n')}
if ret['asn_registry'] not in RIR_WHOIS.keys():
raise ASNRegistryError(
'ASN registry {0} is not known.'.format(
ret['asn_registry'])
)
ret['asn'] = temp[0].strip(' "\n')
ret['asn_cidr'] = temp[1].strip(' \n')
ret['asn_country_code'] = temp[2].strip(' \n').upper()
ret['asn_date'] = temp[4].strip(' "\n')
return ret
except ASNRegistryError:
raise
log.debug('ASN query for {0}'.format(self.dns_zone))
data = self.dns_resolver.query(self.dns_zone, 'TXT')
return list(data)
except (dns.resolver.NXDOMAIN, dns.resolver.NoNameservers,
dns.resolver.NoAnswer, dns.exception.Timeout) as e:
@ -289,85 +260,98 @@ class Net:
e.__class__.__name__, self.address_str)
)
except:
except: # pragma: no cover
raise ASNLookupError(
'ASN lookup failed for {0}.'.format(self.address_str)
)
def get_asn_whois(self, retry_count=3, result=None):
def get_asn_verbose_dns(self, asn=None):
"""
The function for retrieving the information for an ASN from
Cymru via port 53 (DNS). This is needed since IP to ASN mapping via
Cymru DNS does not return the ASN Description like Cymru Whois does.
Args:
asn (:obj:`str`): The AS number (required).
Returns:
str: The raw ASN data.
Raises:
ASNLookupError: The ASN lookup failed.
"""
if asn[0:2] != 'AS':
asn = 'AS{0}'.format(asn)
zone = '{0}.asn.cymru.com'.format(asn)
try:
log.debug('ASN verbose query for {0}'.format(zone))
data = self.dns_resolver.query(zone, 'TXT')
return str(data[0])
except (dns.resolver.NXDOMAIN, dns.resolver.NoNameservers,
dns.resolver.NoAnswer, dns.exception.Timeout) as e:
raise ASNLookupError(
'ASN lookup failed (DNS {0}) for {1}.'.format(
e.__class__.__name__, asn)
)
except: # pragma: no cover
raise ASNLookupError(
'ASN lookup failed for {0}.'.format(asn)
)
def get_asn_whois(self, retry_count=3):
"""
The function for retrieving ASN information for an IP address from
Cymru via port 43/tcp (WHOIS).
Args:
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
result: Optional result object. This bypasses the ASN lookup.
retry_count (:obj:`int`): The number of times to retry in case
socket errors, timeouts, connection resets, etc. are
encountered. Defaults to 3.
Returns:
Dictionary: A dictionary containing the following keys:
asn (String) - The Autonomous System Number.
asn_date (String) - The ASN Allocation date.
asn_registry (String) - The assigned ASN registry.
asn_cidr (String) - The assigned ASN CIDR.
asn_country_code (String) - The assigned ASN country code.
str: The raw ASN data.
Raises:
ASNRegistryError: The ASN registry is not known.
ASNLookupError: The ASN lookup failed.
"""
try:
if result is None:
# Create the connection for the Cymru whois query.
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.settimeout(self.timeout)
log.debug('ASN query for {0}'.format(self.address_str))
conn.connect((CYMRU_WHOIS, 43))
# Create the connection for the Cymru whois query.
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.settimeout(self.timeout)
log.debug('ASN query for {0}'.format(self.address_str))
conn.connect((CYMRU_WHOIS, 43))
# Query the Cymru whois server, and store the results.
conn.send((
' -r -a -c -p -f {0}{1}'.format(
self.address_str, '\r\n')
).encode())
# Query the Cymru whois server, and store the results.
conn.send((
' -r -a -c -p -f -o {0}{1}'.format(
self.address_str, '\r\n')
).encode())
data = ''
while True:
data = ''
while True:
d = conn.recv(4096).decode()
data += d
d = conn.recv(4096).decode()
data += d
if not d:
if not d:
break
break
conn.close()
conn.close()
else:
data = result
# Parse out the ASN information.
temp = str(data).split('|')
ret = {'asn_registry': temp[4].strip(' \n')}
if ret['asn_registry'] not in RIR_WHOIS.keys():
raise ASNRegistryError(
'ASN registry {0} is not known.'.format(
ret['asn_registry'])
)
ret['asn'] = temp[0].strip(' \n')
ret['asn_cidr'] = temp[2].strip(' \n')
ret['asn_country_code'] = temp[3].strip(' \n').upper()
ret['asn_date'] = temp[5].strip(' \n')
return ret
return str(data)
except (socket.timeout, socket.error) as e: # pragma: no cover
@ -384,17 +368,13 @@ class Net:
'ASN lookup failed for {0}.'.format(self.address_str)
)
except ASNRegistryError:
raise
except:
except: # pragma: no cover
raise ASNLookupError(
'ASN lookup failed for {0}.'.format(self.address_str)
)
def get_asn_http(self, retry_count=3, result=None, extra_org_map=None):
def get_asn_http(self, retry_count=3):
"""
The function for retrieving ASN information for an IP address from
Arin via port 80 (HTTP). Currently limited to fetching asn_registry
@ -403,94 +383,29 @@ class Net:
chance fallback call behind ASN DNS & ASN Whois lookups.
Args:
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
result: Optional result object. This bypasses the ASN lookup.
extra_org_map: Dictionary mapping org handles to RIRs. This is for
limited cases where ARIN REST (ASN fallback HTTP lookup) does
not show an RIR as the org handle e.g., DNIC (which is now the
built in ORG_MAP) e.g., {'DNIC': 'arin'}. Valid RIR values are
(note the case-sensitive - this is meant to match the REST
result): 'ARIN', 'RIPE', 'apnic', 'lacnic', 'afrinic'
retry_count (:obj:`int`): The number of times to retry in case
socket errors, timeouts, connection resets, etc. are
encountered. Defaults to 3.
Returns:
Dictionary: A dictionary containing the following keys:
asn (String) - None, can't retrieve with this method.
asn_date (String) - None, can't retrieve with this method.
asn_registry (String) - The assigned ASN registry.
asn_cidr (String) - None, can't retrieve with this method.
asn_country_code (String) - None, can't retrieve with this
method.
dict: The ASN data in json format.
Raises:
ASNRegistryError: The ASN registry is not known.
ASNLookupError: The ASN lookup failed.
"""
# Set the org_map. Map the orgRef handle to an RIR.
org_map = ORG_MAP.copy()
try:
org_map.update(extra_org_map)
except (TypeError, ValueError, IndexError, KeyError):
pass
try:
if result is None:
# Lets attempt to get the ASN registry information from
# ARIN.
log.debug('ASN query for {0}'.format(self.address_str))
response = self.get_http_json(
url=str(ARIN).format(self.address_str),
retry_count=retry_count,
headers={'Accept': 'application/json'}
# Lets attempt to get the ASN registry information from
# ARIN.
log.debug('ASN query for {0}'.format(self.address_str))
response = self.get_http_json(
url=str(ARIN).format(self.address_str),
retry_count=retry_count,
headers={'Accept': 'application/json'}
)
else:
response = result
asn_data = {
'asn_registry': None,
'asn': None,
'asn_cidr': None,
'asn_country_code': None,
'asn_date': None
}
try:
net_list = response['nets']['net']
if not isinstance(net_list, list):
net_list = [net_list]
except (KeyError, TypeError):
log.debug('No networks found')
net_list = []
for n in net_list:
try:
asn_data['asn_registry'] = (
org_map[n['orgRef']['@handle'].upper()]
)
except KeyError as e:
log.debug('Could not parse ASN registry via HTTP: '
'{0}'.format(str(e)))
raise ASNRegistryError('ASN registry lookup failed.')
break
return asn_data
return response
except (socket.timeout, socket.error) as e: # pragma: no cover
@ -507,16 +422,124 @@ class Net:
'ASN lookup failed for {0}.'.format(self.address_str)
)
except ASNRegistryError:
raise
except:
raise ASNLookupError(
'ASN lookup failed for {0}.'.format(self.address_str)
)
def get_asn_origin_whois(self, asn_registry='radb', asn=None,
retry_count=3, server=None, port=43):
"""
The function for retrieving CIDR info for an ASN via whois.
Args:
asn_registry (:obj:`str`): The source to run the query against
(asn.ASN_ORIGIN_WHOIS).
asn (:obj:`str`): The AS number (required).
retry_count (:obj:`int`): The number of times to retry in case
socket errors, timeouts, connection resets, etc. are
encountered. Defaults to 3.
server (:obj:`str`): An optional server to connect to.
port (:obj:`int`): The network port to connect on. Defaults to 43.
Returns:
str: The raw ASN origin whois data.
Raises:
WhoisLookupError: The ASN origin whois lookup failed.
WhoisRateLimitError: The ASN origin Whois request rate limited and
retries were exhausted.
"""
try:
if server is None:
server = ASN_ORIGIN_WHOIS[asn_registry]['server']
# Create the connection for the whois query.
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.settimeout(self.timeout)
log.debug('ASN origin WHOIS query for {0} at {1}:{2}'.format(
asn, server, port))
conn.connect((server, port))
# Prep the query.
query = ' -i origin {0}{1}'.format(asn, '\r\n')
# Query the whois server, and store the results.
conn.send(query.encode())
response = ''
while True:
d = conn.recv(4096).decode()
response += d
if not d:
break
conn.close()
# TODO: this was taken from get_whois(). Need to test rate limiting
if 'Query rate limit exceeded' in response: # pragma: no cover
if retry_count > 0:
log.debug('ASN origin WHOIS query rate limit exceeded. '
'Waiting...')
sleep(1)
return self.get_asn_origin_whois(
asn_registry=asn_registry, asn=asn,
retry_count=retry_count-1,
server=server, port=port
)
else:
raise WhoisRateLimitError(
'ASN origin Whois lookup failed for {0}. Rate limit '
'exceeded, wait and try again (possibly a '
'temporary block).'.format(asn))
elif ('error 501' in response or 'error 230' in response
): # pragma: no cover
log.debug('ASN origin WHOIS query error: {0}'.format(response))
raise ValueError
return str(response)
except (socket.timeout, socket.error) as e:
log.debug('ASN origin WHOIS query socket error: {0}'.format(e))
if retry_count > 0:
log.debug('ASN origin WHOIS query retrying (count: {0})'
''.format(str(retry_count)))
return self.get_asn_origin_whois(
asn_registry=asn_registry, asn=asn,
retry_count=retry_count-1, server=server, port=port
)
else:
raise WhoisLookupError(
'ASN origin WHOIS lookup failed for {0}.'.format(asn)
)
except WhoisRateLimitError: # pragma: no cover
raise
except: # pragma: no cover
raise WhoisLookupError(
'ASN origin WHOIS lookup failed for {0}.'.format(asn)
)
def get_whois(self, asn_registry='arin', retry_count=3, server=None,
port=43, extra_blacklist=None):
"""
@ -524,22 +547,26 @@ class Net:
address via any port. Defaults to port 43/tcp (WHOIS).
Args:
asn_registry: The NIC to run the query against.
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
server: An optional server to connect to. If provided, asn_registry
will be ignored.
port: The network port to connect on.
extra_blacklist: A list of blacklisted whois servers in addition to
the global BLACKLIST.
asn_registry (:obj:`str`): The NIC to run the query against.
Defaults to 'arin'.
retry_count (:obj:`int`): The number of times to retry in case
socket errors, timeouts, connection resets, etc. are
encountered. Defaults to 3.
server (:obj:`str`): An optional server to connect to. If
provided, asn_registry will be ignored.
port (:obj:`int`): The network port to connect on. Defaults to 43.
extra_blacklist (:obj:`list` of :obj:`str`): Blacklisted whois
servers in addition to the global BLACKLIST. Defaults to None.
Returns:
String: The raw whois data.
str: The raw whois data.
Raises:
BlacklistError: Raised if the whois server provided is in the
global BLACKLIST or extra_blacklist.
WhoisLookupError: The whois lookup failed.
WhoisRateLimitError: The Whois request rate limited and retries
were exhausted.
"""
try:
@ -585,12 +612,22 @@ class Net:
if 'Query rate limit exceeded' in response: # pragma: no cover
log.debug('WHOIS query rate limit exceeded. Waiting...')
sleep(1)
return self.get_whois(
asn_registry=asn_registry, retry_count=retry_count-1,
server=server, port=port, extra_blacklist=extra_blacklist
)
if retry_count > 0:
log.debug('WHOIS query rate limit exceeded. Waiting...')
sleep(1)
return self.get_whois(
asn_registry=asn_registry, retry_count=retry_count-1,
server=server, port=port,
extra_blacklist=extra_blacklist
)
else:
raise WhoisRateLimitError(
'Whois lookup failed for {0}. Rate limit '
'exceeded, wait and try again (possibly a '
'temporary block).'.format(self.address_str))
elif ('error 501' in response or 'error 230' in response
): # pragma: no cover
@ -618,6 +655,10 @@ class Net:
'WHOIS lookup failed for {0}.'.format(self.address_str)
)
except WhoisRateLimitError: # pragma: no cover
raise
except BlacklistError:
raise
@ -634,16 +675,18 @@ class Net:
The function for retrieving a json result via HTTP.
Args:
url: The URL to retrieve.
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
rate_limit_timeout: The number of seconds to wait before retrying
when a rate limit notice is returned via rdap+json.
headers: The HTTP headers dictionary. The Accept header defaults
to 'application/rdap+json'.
url (:obj:`str`): The URL to retrieve (required).
retry_count (:obj:`int`): The number of times to retry in case
socket errors, timeouts, connection resets, etc. are
encountered. Defaults to 3.
rate_limit_timeout (:obj:`int`): The number of seconds to wait
before retrying when a rate limit notice is returned via
rdap+json or HTTP error 429. Defaults to 60.
headers (:obj:`dict`): The HTTP headers. The Accept header
defaults to 'application/rdap+json'.
Returns:
Dictionary: The data in json format.
dict: The data in json format.
Raises:
HTTPLookupError: The HTTP lookup failed.
@ -695,17 +738,36 @@ class Net:
return d
except HTTPError as e: # pragma: no cover
# RIPE is producing this HTTP error rather than a JSON error.
if e.code == 429:
log.debug('HTTP query rate limit exceeded.')
if retry_count > 0:
log.debug('Waiting {0} seconds...'.format(
str(rate_limit_timeout)))
sleep(rate_limit_timeout)
return self.get_http_json(
url=url, retry_count=retry_count - 1,
rate_limit_timeout=rate_limit_timeout,
headers=headers
)
else:
raise HTTPRateLimitError(
'HTTP lookup failed for {0}. Rate limit '
'exceeded, wait and try again (possibly a '
'temporary block).'.format(url))
else:
raise HTTPLookupError('HTTP lookup failed for {0} with error '
'code {1}.'.format(url, str(e.code)))
except (URLError, socket.timeout, socket.error) as e:
# Check needed for Python 2.6, also why URLError is caught.
try: # pragma: no cover
if not isinstance(e.reason, (socket.timeout, socket.error)):
raise HTTPLookupError('HTTP lookup failed for {0}.'
''.format(url))
except AttributeError: # pragma: no cover
pass
log.debug('HTTP query socket error: {0}'.format(e))
if retry_count > 0:
@ -735,11 +797,17 @@ class Net:
The function for retrieving host information for an IP address.
Args:
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
retry_count (:obj:`int`): The number of times to retry in case
socket errors, timeouts, connection resets, etc. are
encountered. Defaults to 3.
Returns:
Tuple: hostname, aliaslist, ipaddrlist
namedtuple:
:hostname (str): The hostname returned mapped to the given IP
address.
:aliaslist (list): Alternate names for the given IP address.
:ipaddrlist (list): IPv4/v6 addresses mapped to the same hostname.
Raises:
HostLookupError: The host lookup failed.
@ -760,7 +828,9 @@ class Net:
socket.setdefaulttimeout(None)
return ret
results = namedtuple('get_host_results', 'hostname, aliaslist, '
'ipaddrlist')
return results(ret)
except (socket.timeout, socket.error) as e:
@ -784,110 +854,24 @@ class Net:
'Host lookup failed for {0}.'.format(self.address_str)
)
def lookup_asn(self, retry_count=3, asn_alts=None, extra_org_map=None):
"""
The wrapper function for retrieving and parsing ASN information for an
IP address.
Args:
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
asn_alts: Array of additional lookup types to attempt if the
ASN dns lookup fails. Allow permutations must be enabled.
Defaults to all ['whois', 'http'].
extra_org_map: Dictionary mapping org handles to RIRs. This is for
limited cases where ARIN REST (ASN fallback HTTP lookup) does
not show an RIR as the org handle e.g., DNIC (which is now the
built in ORG_MAP) e.g., {'DNIC': 'arin'}. Valid RIR values are
(note the case-sensitive - this is meant to match the REST
result): 'ARIN', 'RIPE', 'apnic', 'lacnic', 'afrinic'
Returns:
Tuple:
:Dictionary: Result from get_asn_dns() or get_asn_whois().
:Dictionary: The response returned by get_asn_dns() or
get_asn_whois().
Raises:
ASNRegistryError: ASN registry does not match.
HTTPLookupError: The HTTP lookup failed.
"""
lookups = asn_alts if asn_alts is not None else ['whois', 'http']
# Initialize the response.
response = None
# Attempt to resolve ASN info via Cymru. DNS is faster, try that first.
try:
self.dns_resolver.lifetime = self.dns_resolver.timeout * (
retry_count and retry_count or 1)
asn_data = self.get_asn_dns()
except (ASNLookupError, ASNRegistryError) as e:
if not self.allow_permutations:
raise ASNRegistryError('ASN registry lookup failed. '
'Permutations not allowed.')
try:
if 'whois' in lookups:
log.debug('ASN DNS lookup failed, trying ASN WHOIS: '
'{0}'.format(e))
asn_data = self.get_asn_whois(retry_count)
else:
raise ASNLookupError
except (ASNLookupError, ASNRegistryError): # pragma: no cover
if 'http' in lookups:
# Lets attempt to get the ASN registry information from
# ARIN.
log.debug('ASN WHOIS lookup failed, trying ASN via HTTP')
try:
asn_data = self.get_asn_http(
retry_count=retry_count,
extra_org_map=extra_org_map
)
except ASNRegistryError:
raise ASNRegistryError('ASN registry lookup failed.')
except ASNLookupError:
raise HTTPLookupError('ASN HTTP lookup failed.')
else:
raise ASNRegistryError('ASN registry lookup failed.')
return asn_data, response
def get_http_raw(self, url=None, retry_count=3, headers=None,
request_type='GET', form_data=None):
"""
The function for retrieving a raw HTML result via HTTP.
Args:
url: The URL to retrieve.
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
headers: The HTTP headers dictionary. The Accept header defaults
to 'application/rdap+json'.
request_type: 'GET' or 'POST'
form_data: Dictionary of form POST data
url (:obj:`str`): The URL to retrieve (required).
retry_count (:obj:`int`): The number of times to retry in case
socket errors, timeouts, connection resets, etc. are
encountered. Defaults to 3.
headers (:obj:`dict`): The HTTP headers. The Accept header
defaults to 'text/html'.
request_type (:obj:`str`): Request type 'GET' or 'POST'. Defaults
to 'GET'.
form_data (:obj:`dict`): Optional form POST data.
Returns:
String: The raw data.
str: The raw data.
Raises:
HTTPLookupError: The HTTP lookup failed.
@ -896,10 +880,12 @@ class Net:
if headers is None:
headers = {'Accept': 'text/html'}
enc_form_data = None
if form_data:
form_data = urlencode(form_data)
enc_form_data = urlencode(form_data)
try:
form_data = bytes(form_data, encoding='ascii')
# Py 2 inspection will alert on the encoding arg, no harm done.
enc_form_data = bytes(enc_form_data, encoding='ascii')
except TypeError: # pragma: no cover
pass
@ -909,10 +895,11 @@ class Net:
log.debug('HTTP query for {0} at {1}'.format(
self.address_str, url))
try:
conn = Request(url=url, data=form_data, headers=headers,
method=request_type)
# Py 2 inspection alert bypassed by using kwargs dict.
conn = Request(url=url, data=enc_form_data, headers=headers,
**{'method': request_type})
except TypeError: # pragma: no cover
conn = Request(url=url, data=form_data, headers=headers)
conn = Request(url=url, data=enc_form_data, headers=headers)
data = self.opener.open(conn, timeout=self.timeout)
try:
@ -924,15 +911,6 @@ class Net:
except (URLError, socket.timeout, socket.error) as e:
# Check needed for Python 2.6, also why URLError is caught.
try: # pragma: no cover
if not isinstance(e.reason, (socket.timeout, socket.error)):
raise HTTPLookupError('HTTP lookup failed for {0}.'
''.format(url))
except AttributeError: # pragma: no cover
pass
log.debug('HTTP query socket error: {0}'.format(e))
if retry_count > 0:

682
lib/ipwhois/nir.py Normal file
View file

@ -0,0 +1,682 @@
# Copyright (c) 2013-2019 Philip Hane
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from . import NetError
from .utils import unique_everseen
import logging
import sys
import re
import copy
from datetime import (datetime, timedelta)
if sys.version_info >= (3, 3): # pragma: no cover
from ipaddress import (ip_address,
ip_network,
summarize_address_range,
collapse_addresses)
else: # pragma: no cover
from ipaddr import (IPAddress as ip_address,
IPNetwork as ip_network,
summarize_address_range,
collapse_address_list as collapse_addresses)
log = logging.getLogger(__name__)
# Base NIR whois output dictionary.
BASE_NET = {
'cidr': None,
'name': None,
'handle': None,
'range': None,
'country': None,
'address': None,
'postal_code': None,
'nameservers': None,
'created': None,
'updated': None,
'contacts': None
}
# Base NIR whois contact output dictionary.
BASE_CONTACT = {
'name': None,
'email': None,
'reply_email': None,
'organization': None,
'division': None,
'title': None,
'phone': None,
'fax': None,
'updated': None
}
# National Internet Registry
NIR_WHOIS = {
'jpnic': {
'country_code': 'JP',
'url': ('http://whois.nic.ad.jp/cgi-bin/whois_gw?lang=%2Fe&key={0}'
'&submit=query'),
'request_type': 'GET',
'request_headers': {'Accept': 'text/html'},
'form_data_ip_field': None,
'fields': {
'name': r'(\[Organization\])[^\S\n]+(?P<val>.*?)\n',
'handle': r'(\[Network Name\])[^\S\n]+(?P<val>.*?)\n',
'created': r'(\[Assigned Date\])[^\S\n]+(?P<val>.*?)\n',
'updated': r'(\[Last Update\])[^\S\n]+(?P<val>.*?)\n',
'nameservers': r'(\[Nameserver\])[^\S\n]+(?P<val>.*?)\n',
'contact_admin': r'(\[Administrative Contact\])[^\S\n]+.+?\>'
'(?P<val>.+?)\<\/A\>\n',
'contact_tech': r'(\[Technical Contact\])[^\S\n]+.+?\>'
'(?P<val>.+?)\<\/A\>\n'
},
'contact_fields': {
'name': r'(\[Last, First\])[^\S\n]+(?P<val>.*?)\n',
'email': r'(\[E-Mail\])[^\S\n]+(?P<val>.*?)\n',
'reply_email': r'(\[Reply Mail\])[^\S\n]+(?P<val>.*?)\n',
'organization': r'(\[Organization\])[^\S\n]+(?P<val>.*?)\n',
'division': r'(\[Division\])[^\S\n]+(?P<val>.*?)\n',
'title': r'(\[Title\])[^\S\n]+(?P<val>.*?)\n',
'phone': r'(\[TEL\])[^\S\n]+(?P<val>.*?)\n',
'fax': r'(\[FAX\])[^\S\n]+(?P<val>.*?)\n',
'updated': r'(\[Last Update\])[^\S\n]+(?P<val>.*?)\n'
},
'dt_format': '%Y/%m/%d %H:%M:%S(JST)',
'dt_hourdelta': 9,
'multi_net': False
},
'krnic': {
'country_code': 'KR',
'url': 'https://whois.kisa.or.kr/eng/whois.jsc',
'request_type': 'POST',
'request_headers': {'Accept': 'text/html'},
'form_data_ip_field': 'query',
'fields': {
'name': r'(Organization Name)[\s]+\:[^\S\n]+(?P<val>.+?)\n',
'handle': r'(Service Name|Network Type)[\s]+\:[^\S\n]+(?P<val>.+?)'
'\n',
'address': r'(Address)[\s]+\:[^\S\n]+(?P<val>.+?)\n',
'postal_code': r'(Zip Code)[\s]+\:[^\S\n]+(?P<val>.+?)\n',
'created': r'(Registration Date)[\s]+\:[^\S\n]+(?P<val>.+?)\n',
'contact_admin': r'(id="eng_isp_contact").+?\>(?P<val>.*?)\<'
'\/div\>\n',
'contact_tech': r'(id="eng_user_contact").+?\>(?P<val>.*?)\<'
'\/div\>\n'
},
'contact_fields': {
'name': r'(Name)[^\S\n]+?:[^\S\n]+?(?P<val>.*?)\n',
'email': r'(E-Mail)[^\S\n]+?:[^\S\n]+?(?P<val>.*?)\n',
'phone': r'(Phone)[^\S\n]+?:[^\S\n]+?(?P<val>.*?)\n'
},
'dt_format': '%Y%m%d',
'dt_hourdelta': 0,
'multi_net': True
}
}
class NIRWhois:
"""
The class for parsing whois data for NIRs (National Internet Registry).
JPNIC and KRNIC are currently the only NIRs supported. Output varies
based on NIR specific whois formatting.
Args:
net (:obj:`ipwhois.net.Net`): The network object.
Raises:
NetError: The parameter provided is not an instance of
ipwhois.net.Net
IPDefinedError: The address provided is defined (does not need to be
resolved).
"""
def __init__(self, net):
from .net import Net
# ipwhois.net.Net validation
if isinstance(net, Net):
self._net = net
else:
raise NetError('The provided net parameter is not an instance of '
'ipwhois.net.Net')
def parse_fields(self, response, fields_dict, net_start=None,
net_end=None, dt_format=None, field_list=None,
hourdelta=0, is_contact=False):
"""
The function for parsing whois fields from a data input.
Args:
response (:obj:`str`): The response from the whois/rwhois server.
fields_dict (:obj:`dict`): The mapping of fields to regex search
values (required).
net_start (:obj:`int`): The starting point of the network (if
parsing multiple networks). Defaults to None.
net_end (:obj:`int`): The ending point of the network (if parsing
multiple networks). Defaults to None.
dt_format (:obj:`str`): The format of datetime fields if known.
Defaults to None.
field_list (:obj:`list` of :obj:`str`): If provided, fields to
parse. Defaults to :obj:`ipwhois.nir.BASE_NET` if is_contact
is False. Otherwise, defaults to
:obj:`ipwhois.nir.BASE_CONTACT`.
hourdelta (:obj:`int`): The timezone delta for created/updated
fields. Defaults to 0.
is_contact (:obj:`bool`): If True, uses contact information
field parsing. Defaults to False.
Returns:
dict: A dictionary of fields provided in fields_dict, mapping to
the results of the regex searches.
"""
response = '{0}\n'.format(response)
if is_contact:
ret = {}
if not field_list:
field_list = list(BASE_CONTACT.keys())
else:
ret = {
'contacts': {'admin': None, 'tech': None},
'contact_admin': {},
'contact_tech': {}
}
if not field_list:
field_list = list(BASE_NET.keys())
field_list.remove('contacts')
field_list.append('contact_admin')
field_list.append('contact_tech')
generate = ((field, pattern) for (field, pattern) in
fields_dict.items() if field in field_list)
for field, pattern in generate:
pattern = re.compile(
str(pattern),
re.DOTALL
)
if net_start is not None:
match = pattern.finditer(response, net_end, net_start)
elif net_end is not None:
match = pattern.finditer(response, net_end)
else:
match = pattern.finditer(response)
values = []
for m in match:
try:
values.append(m.group('val').strip())
except IndexError:
pass
if len(values) > 0:
value = None
try:
if field in ['created', 'updated'] and dt_format:
value = (
datetime.strptime(
values[0],
str(dt_format)
) - timedelta(hours=hourdelta)
).isoformat('T')
elif field in ['nameservers']:
value = list(unique_everseen(values))
else:
values = unique_everseen(values)
value = '\n'.join(values)
except ValueError as e:
log.debug('NIR whois field parsing failed for {0}: {1}'
''.format(field, e))
pass
ret[field] = value
return ret
def _parse_fields(self, *args, **kwargs):
"""
Deprecated. This will be removed in a future release.
"""
from warnings import warn
warn('NIRWhois._parse_fields() has been deprecated and will be '
'removed. You should now use NIRWhois.parse_fields().')
return self.parse_fields(*args, **kwargs)
def get_nets_jpnic(self, response):
"""
The function for parsing network blocks from jpnic whois data.
Args:
response (:obj:`str`): The response from the jpnic server.
Returns:
list of dict: Mapping of networks with start and end positions.
::
[{
'cidr' (str) - The network routing block
'start' (int) - The starting point of the network
'end' (int) - The endpoint point of the network
}]
"""
nets = []
# Iterate through all of the networks found, storing the CIDR value
# and the start and end positions.
for match in re.finditer(
r'^.*?(\[Network Number\])[^\S\n]+.+?>(?P<val>.+?)</A>$',
response,
re.MULTILINE
):
try:
net = copy.deepcopy(BASE_NET)
tmp = ip_network(match.group(2))
try: # pragma: no cover
network_address = tmp.network_address
except AttributeError: # pragma: no cover
network_address = tmp.ip
pass
try: # pragma: no cover
broadcast_address = tmp.broadcast_address
except AttributeError: # pragma: no cover
broadcast_address = tmp.broadcast
pass
net['range'] = '{0} - {1}'.format(
network_address + 1, broadcast_address
)
cidr = ip_network(match.group(2).strip()).__str__()
net['cidr'] = cidr
net['start'] = match.start()
net['end'] = match.end()
nets.append(net)
except (ValueError, TypeError):
pass
return nets
def _get_nets_jpnic(self, *args, **kwargs):
"""
Deprecated. This will be removed in a future release.
"""
from warnings import warn
warn('NIRWhois._get_nets_jpnic() has been deprecated and will be '
'removed. You should now use NIRWhois.get_nets_jpnic().')
return self.get_nets_jpnic(*args, **kwargs)
def get_nets_krnic(self, response):
"""
The function for parsing network blocks from krnic whois data.
Args:
response (:obj:`str`): The response from the krnic server.
Returns:
list of dict: Mapping of networks with start and end positions.
::
[{
'cidr' (str) - The network routing block
'start' (int) - The starting point of the network
'end' (int) - The endpoint point of the network
}]
"""
nets = []
# Iterate through all of the networks found, storing the CIDR value
# and the start and end positions.
for match in re.finditer(
r'^(IPv4 Address)[\s]+:[^\S\n]+((.+?)[^\S\n]-[^\S\n](.+?)'
'[^\S\n]\((.+?)\)|.+)$',
response,
re.MULTILINE
):
try:
net = copy.deepcopy(BASE_NET)
net['range'] = match.group(2)
if match.group(3) and match.group(4):
addrs = []
addrs.extend(summarize_address_range(
ip_address(match.group(3).strip()),
ip_address(match.group(4).strip())))
cidr = ', '.join(
[i.__str__() for i in collapse_addresses(addrs)]
)
net['range'] = '{0} - {1}'.format(
match.group(3), match.group(4)
)
else:
cidr = ip_network(match.group(2).strip()).__str__()
net['cidr'] = cidr
net['start'] = match.start()
net['end'] = match.end()
nets.append(net)
except (ValueError, TypeError):
pass
return nets
def _get_nets_krnic(self, *args, **kwargs):
"""
Deprecated. This will be removed in a future release.
"""
from warnings import warn
warn('NIRWhois._get_nets_krnic() has been deprecated and will be '
'removed. You should now use NIRWhois.get_nets_krnic().')
return self.get_nets_krnic(*args, **kwargs)
def get_contact(self, response=None, nir=None, handle=None,
retry_count=3, dt_format=None):
"""
The function for retrieving and parsing NIR whois data based on
NIR_WHOIS contact_fields.
Args:
response (:obj:`str`): Optional response object, this bypasses the
lookup.
nir (:obj:`str`): The NIR to query ('jpnic' or 'krnic'). Required
if response is None.
handle (:obj:`str`): For NIRs that have separate contact queries
(JPNIC), this is the contact handle to use in the query.
Defaults to None.
retry_count (:obj:`int`): The number of times to retry in case
socket errors, timeouts, connection resets, etc. are
encountered. Defaults to 3.
dt_format (:obj:`str`): The format of datetime fields if known.
Defaults to None.
Returns:
dict: Mapping of the fields provided in contact_fields, to their
parsed results.
"""
if response or nir == 'krnic':
contact_response = response
else:
# Retrieve the whois data.
contact_response = self._net.get_http_raw(
url=str(NIR_WHOIS[nir]['url']).format(handle),
retry_count=retry_count,
headers=NIR_WHOIS[nir]['request_headers'],
request_type=NIR_WHOIS[nir]['request_type']
)
return self.parse_fields(
response=contact_response,
fields_dict=NIR_WHOIS[nir]['contact_fields'],
dt_format=dt_format,
hourdelta=int(NIR_WHOIS[nir]['dt_hourdelta']),
is_contact=True
)
def _get_contact(self, *args, **kwargs):
"""
Deprecated. This will be removed in a future release.
"""
from warnings import warn
warn('NIRWhois._get_contact() has been deprecated and will be '
'removed. You should now use NIRWhois.get_contact().')
return self.get_contact(*args, **kwargs)
def lookup(self, nir=None, inc_raw=False, retry_count=3, response=None,
field_list=None, is_offline=False):
"""
The function for retrieving and parsing NIR whois information for an IP
address via HTTP (HTML scraping).
Args:
nir (:obj:`str`): The NIR to query ('jpnic' or 'krnic'). Required
if response is None.
inc_raw (:obj:`bool`, optional): Whether to include the raw
results in the returned dictionary. Defaults to False.
retry_count (:obj:`int`): The number of times to retry in case
socket errors, timeouts, connection resets, etc. are
encountered. Defaults to 3.
response (:obj:`str`): Optional response object, this bypasses the
NIR lookup. Required when is_offline=True.
field_list (:obj:`list` of :obj:`str`): If provided, fields to
parse. Defaults to :obj:`ipwhois.nir.BASE_NET`.
is_offline (:obj:`bool`): Whether to perform lookups offline. If
True, response and asn_data must be provided. Primarily used
for testing.
Returns:
dict: The NIR whois results:
::
{
'query' (str) - The IP address.
'nets' (list of dict) - Network information which consists
of the fields listed in the ipwhois.nir.NIR_WHOIS
dictionary.
'raw' (str) - Raw NIR whois results if the inc_raw
parameter is True.
}
"""
if nir not in NIR_WHOIS.keys():
raise KeyError('Invalid arg for nir (National Internet Registry')
# Create the return dictionary.
results = {
'query': self._net.address_str,
'raw': None
}
# Only fetch the response if we haven't already.
if response is None:
if is_offline:
raise KeyError('response argument required when '
'is_offline=True')
log.debug('Response not given, perform WHOIS lookup for {0}'
.format(self._net.address_str))
form_data = None
if NIR_WHOIS[nir]['form_data_ip_field']:
form_data = {NIR_WHOIS[nir]['form_data_ip_field']:
self._net.address_str}
# Retrieve the whois data.
response = self._net.get_http_raw(
url=str(NIR_WHOIS[nir]['url']).format(self._net.address_str),
retry_count=retry_count,
headers=NIR_WHOIS[nir]['request_headers'],
request_type=NIR_WHOIS[nir]['request_type'],
form_data=form_data
)
# If inc_raw parameter is True, add the response to return dictionary.
if inc_raw:
results['raw'] = response
nets = []
nets_response = None
if nir == 'jpnic':
nets_response = self.get_nets_jpnic(response)
elif nir == 'krnic':
nets_response = self.get_nets_krnic(response)
nets.extend(nets_response)
global_contacts = {}
# Iterate through all of the network sections and parse out the
# appropriate fields for each.
log.debug('Parsing NIR WHOIS data')
for index, net in enumerate(nets):
section_end = None
if index + 1 < len(nets):
section_end = nets[index + 1]['start']
try:
dt_format = NIR_WHOIS[nir]['dt_format']
except KeyError: # pragma: no cover
dt_format = None
temp_net = self.parse_fields(
response=response,
fields_dict=NIR_WHOIS[nir]['fields'],
net_start=section_end,
net_end=net['end'],
dt_format=dt_format,
field_list=field_list,
hourdelta=int(NIR_WHOIS[nir]['dt_hourdelta'])
)
temp_net['country'] = NIR_WHOIS[nir]['country_code']
contacts = {
'admin': temp_net['contact_admin'],
'tech': temp_net['contact_tech']
}
del (
temp_net['contact_admin'],
temp_net['contact_tech']
)
if not is_offline:
for key, val in contacts.items():
if len(val) > 0:
if isinstance(val, str):
val = val.splitlines()
for contact in val:
if contact in global_contacts.keys():
temp_net['contacts'][key] = (
global_contacts[contact]
)
else:
if nir == 'krnic':
tmp_response = contact
tmp_handle = None
else:
tmp_response = None
tmp_handle = contact
temp_net['contacts'][key] = self.get_contact(
response=tmp_response,
handle=tmp_handle,
nir=nir,
retry_count=retry_count,
dt_format=dt_format
)
global_contacts[contact] = (
temp_net['contacts'][key]
)
# Merge the net dictionaries.
net.update(temp_net)
# The start and end values are no longer needed.
del net['start'], net['end']
# Add the networks to the return dictionary.
results['nets'] = nets
return results

View file

@ -1,4 +1,4 @@
# Copyright (c) 2013, 2014, 2015, 2016 Philip Hane
# Copyright (c) 2013-2019 Philip Hane
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@ -64,7 +64,8 @@ class _RDAPContact:
https://tools.ietf.org/html/rfc7095
Args:
vcard: The vcard list from an RDAP IP address query.
vcard (:obj:`list` of :obj:`list`): The vcard list from an RDAP IP
address query.
Raises:
InvalidEntityContactObject: vcard is not an RDAP entity contact
@ -93,7 +94,7 @@ class _RDAPContact:
The function for parsing the vcard name.
Args:
val: The value to parse.
val (:obj:`list`): The value to parse.
"""
self.vars['name'] = val[3].strip()
@ -103,7 +104,7 @@ class _RDAPContact:
The function for parsing the vcard kind.
Args:
val: The value to parse.
val (:obj:`list`): The value to parse.
"""
self.vars['kind'] = val[3].strip()
@ -113,7 +114,7 @@ class _RDAPContact:
The function for parsing the vcard address.
Args:
val: The value to parse.
val (:obj:`list`): The value to parse.
"""
ret = {
@ -151,7 +152,7 @@ class _RDAPContact:
The function for parsing the vcard phone numbers.
Args:
val: The value to parse.
val (:obj:`list`): The value to parse.
"""
ret = {
@ -183,7 +184,7 @@ class _RDAPContact:
The function for parsing the vcard email addresses.
Args:
val: The value to parse.
val (:obj:`list`): The value to parse.
"""
ret = {
@ -215,7 +216,7 @@ class _RDAPContact:
The function for parsing the vcard role.
Args:
val: The value to parse.
val (:obj:`list`): The value to parse.
"""
self.vars['role'] = val[3].strip()
@ -225,7 +226,7 @@ class _RDAPContact:
The function for parsing the vcard title.
Args:
val: The value to parse.
val (:obj:`list`): The value to parse.
"""
self.vars['title'] = val[3].strip()
@ -263,7 +264,7 @@ class _RDAPCommon:
https://tools.ietf.org/html/rfc7483#section-5
Args:
json_result: The JSON response from an RDAP query.
json_result (:obj:`dict`): The JSON response from an RDAP query.
Raises:
ValueError: vcard is not a known RDAP object.
@ -292,10 +293,11 @@ class _RDAPCommon:
https://tools.ietf.org/html/rfc7483#section-4.2
Args:
links_json: A json dictionary of links from RDAP results.
links_json (:obj:`dict`): A json mapping of links from RDAP
results.
Returns:
List: A unique list of found RDAP link dictionaries.
list of str: Unique RDAP links.
"""
ret = []
@ -314,10 +316,20 @@ class _RDAPCommon:
https://tools.ietf.org/html/rfc7483#section-4.3
Args:
notices_json: A json dictionary of notices from RDAP results.
notices_json (:obj:`dict`): A json mapping of notices from RDAP
results.
Returns:
List: A unique list of found RDAP notices dictionaries.
list of dict: Unique RDAP notices information:
::
[{
'title' (str) - The title/header of the notice.
'description' (str) - The description/body of the notice.
'links' (list) - Unique links returned by
:obj:`ipwhois.rdap._RDAPCommon.summarize_links()`.
}]
"""
ret = []
@ -354,7 +366,7 @@ class _RDAPCommon:
pass
if all(tmp.values()):
if any(tmp.values()):
ret.append(tmp)
@ -366,10 +378,20 @@ class _RDAPCommon:
https://tools.ietf.org/html/rfc7483#section-4.5
Args:
events_json: A json dictionary of events from RDAP results.
events_json (:obj:`dict`): A json mapping of events from RDAP
results.
Returns:
List: A unique list of found RDAP events dictionaries.
list of dict: Unique RDAP events information:
::
[{
'action' (str) - The reason for an event.
'timestamp' (str) - The timestamp for when an event
occured.
'actor' (str) - The identifier for an event initiator.
}]
"""
ret = []
@ -440,7 +462,8 @@ class _RDAPNetwork(_RDAPCommon):
https://tools.ietf.org/html/rfc7483#section-5.4
Args:
json_result: The JSON response from an RDAP IP address query.
json_result (:obj:`dict`): The JSON response from an RDAP IP address
query.
Raises:
InvalidNetworkObject: json_result is not an RDAP network object.
@ -551,7 +574,7 @@ class _RDAPEntity(_RDAPCommon):
https://tools.ietf.org/html/rfc7483#section-5.1
Args:
json_result: The JSON response from an RDAP query.
json_result (:obj:`dict`): The JSON response from an RDAP query.
Raises:
InvalidEntityObject: json_result is not an RDAP entity object.
@ -645,7 +668,7 @@ class RDAP:
https://www.arin.net/resources/rdap.html
Args:
net: A ipwhois.net.Net object.
net (:obj:`ipwhois.net.Net`): The network object.
Raises:
NetError: The parameter provided is not an instance of
@ -673,34 +696,45 @@ class RDAP:
address via RDAP (HTTP).
Args:
inc_raw: Boolean for whether to include the raw results in the
returned dictionary.
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
asn_data: Result dictionary from ipwhois.net.Net.lookup_asn().
Optional if the bootstrap parameter is True.
depth: How many levels deep to run queries when additional
referenced objects are found.
excluded_entities: A list of entity handles to not perform lookups.
response: Optional response object, this bypasses the RDAP lookup.
bootstrap: If True, performs lookups via ARIN bootstrap rather
than lookups based on ASN data.
rate_limit_timeout: The number of seconds to wait before retrying
when a rate limit notice is returned via rdap+json.
inc_raw (:obj:`bool`, optional): Whether to include the raw
results in the returned dictionary. Defaults to False.
retry_count (:obj:`int`): The number of times to retry in case
socket errors, timeouts, connection resets, etc. are
encountered. Defaults to 3.
asn_data (:obj:`dict`): Result from
:obj:`ipwhois.asn.IPASN.lookup`. Optional if the bootstrap
parameter is True.
depth (:obj:`int`): How many levels deep to run queries when
additional referenced objects are found. Defaults to 0.
excluded_entities (:obj:`list`): Entity handles to not perform
lookups. Defaults to None.
response (:obj:`str`): Optional response object, this bypasses the
RDAP lookup.
bootstrap (:obj:`bool`): If True, performs lookups via ARIN
bootstrap rather than lookups based on ASN data. Defaults to
False.
rate_limit_timeout (:obj:`int`): The number of seconds to wait
before retrying when a rate limit notice is returned via
rdap+json. Defaults to 120.
Returns:
Dictionary:
dict: The IP RDAP lookup results
:query: The IP address (String)
:network: Dictionary of values returned by _RDAPNetwork. The raw
result is included for each entity if the inc_raw parameter is
True.
:entities: List of entity keys referenced by the top level IP
address query.
:objects: Dictionary of objects with the handles as keys, and the
dictionary returned by _RDAPEntity, etc as the values. The raw
result is included for each object if the inc_raw parameter is
True.
::
{
'query' (str) - The IP address
'entities' (list) - Entity handles referred by the top
level query.
'network' (dict) - Network information which consists of
the fields listed in the ipwhois.rdap._RDAPNetwork
dict.
'objects' (dict) - Mapping of entity handle->entity dict
which consists of the fields listed in the
ipwhois.rdap._RDAPEntity dict. The raw result is
included for each object if the inc_raw parameter
is True.
}
"""
if not excluded_entities:
@ -747,6 +781,7 @@ class RDAP:
results['network'] = result_net.vars
results['entities'] = []
results['objects'] = {}
roles = {}
# Iterate through and parse the root level entities.
log.debug('Parsing RDAP root level entities')
@ -764,6 +799,16 @@ class RDAP:
results['entities'].append(ent['handle'])
try:
for tmp in ent['entities']:
roles[tmp['handle']] = tmp['roles']
except KeyError:
pass
except KeyError:
pass
@ -811,6 +856,27 @@ class RDAP:
result_ent.parse()
new_objects[ent] = result_ent.vars
new_objects[ent]['roles'] = None
try:
new_objects[ent]['roles'] = roles[ent]
except KeyError: # pragma: no cover
pass
try:
for tmp in response['entities']:
if tmp['handle'] not in roles:
roles[tmp['handle']] = tmp['roles']
except (IndexError, KeyError):
pass
if inc_raw:
new_objects[ent]['raw'] = response

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,269 @@
# Copyright (c) 2013-2019 Philip Hane
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# CLI python script interface for ipwhois.utils lookups.
import argparse
from collections import OrderedDict
import json
from ipwhois.utils import (ipv4_lstrip_zeros, calculate_cidr, get_countries,
ipv4_is_defined, ipv6_is_defined, unique_everseen,
unique_addresses)
# CLI ANSI rendering
ANSI = {
'end': '\033[0m',
'b': '\033[1m',
'ul': '\033[4m',
'red': '\033[31m',
'green': '\033[32m',
'yellow': '\033[33m',
'cyan': '\033[36m'
}
# Setup the arg parser.
parser = argparse.ArgumentParser(
description='ipwhois utilities CLI interface'
)
parser.add_argument(
'--ipv4_lstrip_zeros',
type=str,
nargs=1,
metavar='"IP ADDRESS"',
help='Strip leading zeros in each octet of an IPv4 address.'
)
parser.add_argument(
'--calculate_cidr',
type=str,
nargs=2,
metavar='"IP ADDRESS"',
help='Calculate a CIDR range(s) from a start and end IP address.'
)
parser.add_argument(
'--get_countries',
action='store_true',
help='Output a dictionary containing ISO_3166-1 country codes to names.'
)
parser.add_argument(
'--get_country',
type=str,
nargs=1,
metavar='"COUNTRY CODE"',
help='Output the ISO_3166-1 name for a country code.'
)
parser.add_argument(
'--ipv4_is_defined',
type=str,
nargs=1,
metavar='"IP ADDRESS"',
help='Check if an IPv4 address is defined (in a reserved address range).'
)
parser.add_argument(
'--ipv6_is_defined',
type=str,
nargs=1,
metavar='"IP ADDRESS"',
help='Check if an IPv6 address is defined (in a reserved address range).'
)
parser.add_argument(
'--unique_everseen',
type=json.loads,
nargs=1,
metavar='"ITERABLE"',
help='List unique elements from input iterable, preserving the order.'
)
parser.add_argument(
'--unique_addresses',
type=str,
nargs=1,
metavar='"FILE PATH"',
help='Search an input file, extracting, counting, and summarizing '
'IPv4/IPv6 addresses/networks.'
)
# Output options
group = parser.add_argument_group('Output options')
group.add_argument(
'--colorize',
action='store_true',
help='If set, colorizes the output using ANSI. Should work in most '
'platform consoles.'
)
# Get the args
script_args = parser.parse_args()
if script_args.ipv4_lstrip_zeros:
print(ipv4_lstrip_zeros(address=script_args.ipv4_lstrip_zeros[0]))
elif script_args.calculate_cidr:
try:
result = calculate_cidr(
start_address=script_args.calculate_cidr[0],
end_address=script_args.calculate_cidr[1]
)
print('{0}Found {1} CIDR blocks for ({2}, {3}){4}:\n{5}'.format(
ANSI['green'] if script_args.colorize else '',
len(result),
script_args.calculate_cidr[0],
script_args.calculate_cidr[1],
ANSI['end'] if script_args.colorize else '',
'\n'.join(result)
))
except Exception as e:
print('{0}Error{1}: {2}'.format(ANSI['red'], ANSI['end'], str(e)))
elif script_args.get_countries:
try:
result = get_countries()
print('{0}Found {1} countries{2}:\n{3}'.format(
ANSI['green'] if script_args.colorize else '',
len(result),
ANSI['end'] if script_args.colorize else '',
'\n'.join(['{0}: {1}'.format(k, v) for k, v in (
OrderedDict(sorted(result.items())).iteritems())])
))
except Exception as e:
print('{0}Error{1}: {2}'.format(ANSI['red'], ANSI['end'], str(e)))
elif script_args.get_country:
try:
countries = get_countries()
result = countries[script_args.get_country[0].upper()]
print('{0}Match found for country code ({1}){2}:\n{3}'.format(
ANSI['green'] if script_args.colorize else '',
script_args.get_country[0],
ANSI['end'] if script_args.colorize else '',
result
))
except Exception as e:
print('{0}Error{1}: {2}'.format(ANSI['red'], ANSI['end'], str(e)))
elif script_args.ipv4_is_defined:
try:
result = ipv4_is_defined(address=script_args.ipv4_is_defined[0])
if result[0]:
print('{0}{1} is defined{2}:\n{3}'.format(
ANSI['green'] if script_args.colorize else '',
script_args.ipv4_is_defined[0],
ANSI['end'] if script_args.colorize else '',
'Name: {0}\nRFC: {1}'.format(result[1], result[2])
))
else:
print('{0}{1} is not defined{2}'.format(
ANSI['yellow'] if script_args.colorize else '',
script_args.ipv4_is_defined[0],
ANSI['end'] if script_args.colorize else ''
))
except Exception as e:
print('{0}Error{1}: {2}'.format(ANSI['red'], ANSI['end'], str(e)))
elif script_args.ipv6_is_defined:
try:
result = ipv6_is_defined(address=script_args.ipv6_is_defined[0])
if result[0]:
print('{0}{1} is defined{2}:\n{3}'.format(
ANSI['green'] if script_args.colorize else '',
script_args.ipv6_is_defined[0],
ANSI['end'] if script_args.colorize else '',
'Name: {0}\nRFC: {1}'.format(result[1], result[2])
))
else:
print('{0}{1} is not defined{2}'.format(
ANSI['yellow'] if script_args.colorize else '',
script_args.ipv6_is_defined[0],
ANSI['end'] if script_args.colorize else ''
))
except Exception as e:
print('{0}Error{1}: {2}'.format(ANSI['red'], ANSI['end'], str(e)))
elif script_args.unique_everseen:
try:
result = list(unique_everseen(iterable=script_args.unique_everseen[0]))
print('{0}Unique everseen{1}:\n{2}'.format(
ANSI['green'] if script_args.colorize else '',
ANSI['end'] if script_args.colorize else '',
result
))
except Exception as e:
print('{0}Error{1}: {2}'.format(ANSI['red'], ANSI['end'], str(e)))
elif script_args.unique_addresses:
try:
result = unique_addresses(file_path=script_args.unique_addresses[0])
tmp = []
for k, v in sorted(result.items(), key=lambda kv: int(kv[1]['count']),
reverse=True):
tmp.append('{0}{1}{2}: Count: {3}, Ports: {4}'.format(
ANSI['b'] if script_args.colorize else '',
k,
ANSI['end'] if script_args.colorize else '',
v['count'],
json.dumps(v['ports'])
))
print('{0}Found {1} unique addresses{2}:\n{3}'.format(
ANSI['green'] if script_args.colorize else '',
len(result),
ANSI['end'] if script_args.colorize else '',
'\n'.join(tmp)
))
except Exception as e:
print('{0}Error{1}: {2}'.format(ANSI['red'], ANSI['end'], str(e)))

View file

@ -1,4 +1,4 @@
# Copyright (c) 2013, 2014, 2015, 2016 Philip Hane
# Copyright (c) 2013-2019 Philip Hane
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@ -29,6 +29,8 @@ import re
import copy
import io
import csv
import random
from collections import namedtuple
import logging
if sys.version_info >= (3, 3): # pragma: no cover
@ -117,10 +119,10 @@ def ipv4_lstrip_zeros(address):
The function to strip leading zeros in each octet of an IPv4 address.
Args:
address: An IPv4 address in string format.
address (:obj:`str`): An IPv4 address.
Returns:
String: The modified IPv4 address string.
str: The modified IPv4 address.
"""
# Split the octets.
@ -141,11 +143,11 @@ def calculate_cidr(start_address, end_address):
The function to calculate a CIDR range(s) from a start and end IP address.
Args:
start_address: The starting IP address in string format.
end_address: The ending IP address in string format.
start_address (:obj:`str`): The starting IP address.
end_address (:obj:`str`): The ending IP address.
Returns:
List: A list of calculated CIDR ranges.
list of str: The calculated CIDR ranges.
"""
tmp_addrs = []
@ -179,12 +181,12 @@ def get_countries(is_legacy_xml=False):
to names.
Args:
is_legacy_xml: Boolean for whether to use the older country code
is_legacy_xml (:obj:`bool`): Whether to use the older country code
list (iso_3166-1_list_en.xml).
Returns:
Dictionary: A dictionary with the country codes as the keys and the
country names as the values.
dict: A mapping of country codes as the keys to the country names as
the values.
"""
# Initialize the countries dictionary.
@ -265,82 +267,95 @@ def ipv4_is_defined(address):
be resolved).
Args:
address: An IPv4 address in string format.
address (:obj:`str`): An IPv4 address.
Returns:
Tuple:
namedtuple:
:Boolean: True if given address is defined, otherwise False
:String: IETF assignment name if given address is defined, otherwise ''
:String: IETF assignment RFC if given address is defined, otherwise ''
:is_defined (bool): True if given address is defined, otherwise
False
:ietf_name (str): IETF assignment name if given address is
defined, otherwise ''
:ietf_rfc (str): IETF assignment RFC if given address is defined,
otherwise ''
"""
# Initialize the IP address object.
query_ip = IPv4Address(str(address))
# Initialize the results named tuple
results = namedtuple('ipv4_is_defined_results', 'is_defined, ietf_name, '
'ietf_rfc')
# This Network
if query_ip in IPv4Network('0.0.0.0/8'):
return True, 'This Network', 'RFC 1122, Section 3.2.1.3'
return results(True, 'This Network', 'RFC 1122, Section 3.2.1.3')
# Loopback
elif query_ip.is_loopback:
return True, 'Loopback', 'RFC 1122, Section 3.2.1.3'
return results(True, 'Loopback', 'RFC 1122, Section 3.2.1.3')
# Link Local
elif query_ip.is_link_local:
return True, 'Link Local', 'RFC 3927'
return results(True, 'Link Local', 'RFC 3927')
# IETF Protocol Assignments
elif query_ip in IPv4Network('192.0.0.0/24'):
return True, 'IETF Protocol Assignments', 'RFC 5736'
return results(True, 'IETF Protocol Assignments', 'RFC 5736')
# TEST-NET-1
elif query_ip in IPv4Network('192.0.2.0/24'):
return True, 'TEST-NET-1', 'RFC 5737'
return results(True, 'TEST-NET-1', 'RFC 5737')
# 6to4 Relay Anycast
elif query_ip in IPv4Network('192.88.99.0/24'):
return True, '6to4 Relay Anycast', 'RFC 3068'
return results(True, '6to4 Relay Anycast', 'RFC 3068')
# Network Interconnect Device Benchmark Testing
elif query_ip in IPv4Network('198.18.0.0/15'):
return (True,
return (results(True,
'Network Interconnect Device Benchmark Testing',
'RFC 2544')
'RFC 2544'))
# TEST-NET-2
elif query_ip in IPv4Network('198.51.100.0/24'):
return True, 'TEST-NET-2', 'RFC 5737'
return results(True, 'TEST-NET-2', 'RFC 5737')
# TEST-NET-3
elif query_ip in IPv4Network('203.0.113.0/24'):
return True, 'TEST-NET-3', 'RFC 5737'
return results(True, 'TEST-NET-3', 'RFC 5737')
# Multicast
elif query_ip.is_multicast:
return True, 'Multicast', 'RFC 3171'
return results(True, 'Multicast', 'RFC 3171')
# Limited Broadcast
elif query_ip in IPv4Network('255.255.255.255/32'):
return True, 'Limited Broadcast', 'RFC 919, Section 7'
return results(True, 'Limited Broadcast', 'RFC 919, Section 7')
# Private-Use Networks
elif query_ip.is_private:
return True, 'Private-Use Networks', 'RFC 1918'
return results(True, 'Private-Use Networks', 'RFC 1918')
return False, '', ''
# New IANA Reserved
# TODO: Someone needs to find the RFC for this
elif query_ip in IPv4Network('198.97.38.0/24'):
return results(True, 'IANA Reserved', '')
return results(False, '', '')
def ipv6_is_defined(address):
@ -349,55 +364,61 @@ def ipv6_is_defined(address):
be resolved).
Args:
address: An IPv6 address in string format.
address (:obj:`str`): An IPv6 address.
Returns:
Tuple:
namedtuple:
:Boolean: True if address is defined, otherwise False
:String: IETF assignment name if address is defined, otherwise ''
:String: IETF assignment RFC if address is defined, otherwise ''
:is_defined (bool): True if given address is defined, otherwise
False
:ietf_name (str): IETF assignment name if given address is
defined, otherwise ''
:ietf_rfc (str): IETF assignment RFC if given address is defined,
otherwise ''
"""
# Initialize the IP address object.
query_ip = IPv6Address(str(address))
# Initialize the results named tuple
results = namedtuple('ipv6_is_defined_results', 'is_defined, ietf_name, '
'ietf_rfc')
# Multicast
if query_ip.is_multicast:
return True, 'Multicast', 'RFC 4291, Section 2.7'
return results(True, 'Multicast', 'RFC 4291, Section 2.7')
# Unspecified
elif query_ip.is_unspecified:
return True, 'Unspecified', 'RFC 4291, Section 2.5.2'
return results(True, 'Unspecified', 'RFC 4291, Section 2.5.2')
# Loopback.
elif query_ip.is_loopback:
return True, 'Loopback', 'RFC 4291, Section 2.5.3'
return results(True, 'Loopback', 'RFC 4291, Section 2.5.3')
# Reserved
elif query_ip.is_reserved:
return True, 'Reserved', 'RFC 4291'
return results(True, 'Reserved', 'RFC 4291')
# Link-Local
elif query_ip.is_link_local:
return True, 'Link-Local', 'RFC 4291, Section 2.5.6'
return results(True, 'Link-Local', 'RFC 4291, Section 2.5.6')
# Site-Local
elif query_ip.is_site_local:
return True, 'Site-Local', 'RFC 4291, Section 2.5.7'
return results(True, 'Site-Local', 'RFC 4291, Section 2.5.7')
# Unique Local Unicast
elif query_ip.is_private:
return True, 'Unique Local Unicast', 'RFC 4193'
return results(True, 'Unique Local Unicast', 'RFC 4193')
return False, '', ''
return results(False, '', '')
def unique_everseen(iterable, key=None):
@ -406,11 +427,12 @@ def unique_everseen(iterable, key=None):
elements ever seen. This was taken from the itertools recipes.
Args:
iterable: An iterable to process.
key: Optional function to run when checking elements (e.g., str.lower)
iterable (:obj:`iter`): An iterable to process.
key (:obj:`callable`): Optional function to run when checking
elements (e.g., str.lower)
Returns:
Generator: Yields a generator object.
Yields:
The next unique element found.
"""
seen = set()
@ -442,17 +464,23 @@ def unique_addresses(data=None, file_path=None):
If both a string and file_path are provided, it will process them both.
Args:
data: A string to process.
file_path: An optional file path to process.
data (:obj:`str`): The data to process.
file_path (:obj:`str`): An optional file path to process.
Returns:
Dictionary:
dict: The addresses/networks mapped to ports and counts:
:ip address/network: Each address or network found is a dictionary w/\:
::
:count: Total number of times seen (Integer)
:ports: Dictionary with port numbers as keys and the number of
times seen for this ip as values (Dictionary)
{
'1.2.3.4' (dict) - Each address or network found is a
dictionary:
{
'count' (int) - Total number of times seen.
'ports' (dict) - Mapping of port numbers as keys and
the number of times seen for this ip as values.
}
}
Raises:
ValueError: Arguments provided are invalid.
@ -551,3 +579,53 @@ def unique_addresses(data=None, file_path=None):
continue
return ret
def ipv4_generate_random(total=100):
"""
The generator to produce random, unique IPv4 addresses that are not
defined (can be looked up using ipwhois).
Args:
total (:obj:`int`): The total number of IPv4 addresses to generate.
Yields:
str: The next IPv4 address.
"""
count = 0
yielded = set()
while count < total:
address = str(IPv4Address(random.randint(0, 2**32-1)))
if not ipv4_is_defined(address)[0] and address not in yielded:
count += 1
yielded.add(address)
yield address
def ipv6_generate_random(total=100):
"""
The generator to produce random, unique IPv6 addresses that are not
defined (can be looked up using ipwhois).
Args:
total (:obj:`int`): The total number of IPv6 addresses to generate.
Yields:
str: The next IPv6 address.
"""
count = 0
yielded = set()
while count < total:
address = str(IPv6Address(random.randint(0, 2**128-1)))
if not ipv6_is_defined(address)[0] and address not in yielded:
count += 1
yielded.add(address)
yield address

View file

@ -1,4 +1,4 @@
# Copyright (c) 2013, 2014, 2015, 2016 Philip Hane
# Copyright (c) 2013-2019 Philip Hane
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@ -186,7 +186,7 @@ class Whois:
The class for parsing via whois
Args:
net: A ipwhois.net.Net object.
net (:obj:`ipwhois.net.Net`): The network object.
Raises:
NetError: The parameter provided is not an instance of
@ -209,25 +209,33 @@ class Whois:
raise NetError('The provided net parameter is not an instance of '
'ipwhois.net.Net')
def _parse_fields(self, response, fields_dict, net_start=None,
net_end=None, dt_format=None, field_list=None):
def parse_fields(self, response, fields_dict, net_start=None,
net_end=None, dt_format=None, field_list=None):
"""
The function for parsing whois fields from a data input.
Args:
response: The response from the whois/rwhois server.
fields_dict: The dictionary of fields -> regex search values.
net_start: The starting point of the network (if parsing multiple
networks).
net_end: The ending point of the network (if parsing multiple
networks).
dt_format: The format of datetime fields if known.
field_list: If provided, a list of fields to parse:
['name', 'handle', 'description', 'country', 'state', 'city',
'address', 'postal_code', 'emails', 'created', 'updated']
response (:obj:`str`): The response from the whois/rwhois server.
fields_dict (:obj:`dict`): The mapping of fields to regex search
values (required).
net_start (:obj:`int`): The starting point of the network (if
parsing multiple networks). Defaults to None.
net_end (:obj:`int`): The ending point of the network (if parsing
multiple networks). Defaults to None.
dt_format (:obj:`str`): The format of datetime fields if known.
Defaults to None.
field_list (:obj:`list` of :obj:`str`): If provided, fields to
parse. Defaults to:
::
['name', 'handle', 'description', 'country', 'state',
'city', 'address', 'postal_code', 'emails', 'created',
'updated']
Returns:
Dictionary: A dictionary of fields provided in fields_dict.
dict: A dictionary of fields provided in fields_dict, mapping to
the results of the regex searches.
"""
ret = {}
@ -297,10 +305,14 @@ class Whois:
values[0],
str(dt_format)).isoformat('T')
elif field in ['emails']:
value = list(unique_everseen(values))
else:
values = unique_everseen(values)
value = '\n'.join(values)
value = '\n'.join(values).strip()
except ValueError as e:
@ -312,15 +324,33 @@ class Whois:
return ret
def _get_nets_arin(self, response):
def _parse_fields(self, *args, **kwargs):
"""
Deprecated. This will be removed in a future release.
"""
from warnings import warn
warn('Whois._parse_fields() has been deprecated and will be '
'removed. You should now use Whois.parse_fields().')
return self.parse_fields(*args, **kwargs)
def get_nets_arin(self, response):
"""
The function for parsing network blocks from ARIN whois data.
Args:
response: The response from the ARIN whois server.
response (:obj:`str`): The response from the ARIN whois server.
Returns:
List: A of dictionaries containing keys: cidr, start, end.
list of dict: Mapping of networks with start and end positions.
::
[{
'cidr' (str) - The network routing block
'start' (int) - The starting point of the network
'end' (int) - The endpoint point of the network
}]
"""
nets = []
@ -359,7 +389,17 @@ class Whois:
if net_range is not None:
if net_range_start < match.start() or len(nets) > 0:
net['range'] = net_range
try:
net['range'] = '{0} - {1}'.format(
ip_network(net_range)[0].__str__(),
ip_network(net_range)[-1].__str__()
) if '/' in net_range else net_range
except ValueError: # pragma: no cover
net['range'] = net_range
net['cidr'] = ', '.join(
[ip_network(c.strip()).__str__()
@ -375,15 +415,33 @@ class Whois:
return nets
def _get_nets_lacnic(self, response):
def _get_nets_arin(self, *args, **kwargs):
"""
Deprecated. This will be removed in a future release.
"""
from warnings import warn
warn('Whois._get_nets_arin() has been deprecated and will be '
'removed. You should now use Whois.get_nets_arin().')
return self.get_nets_arin(*args, **kwargs)
def get_nets_lacnic(self, response):
"""
The function for parsing network blocks from LACNIC whois data.
Args:
response: The response from the LACNIC whois server.
response (:obj:`str`): The response from the LACNIC whois server.
Returns:
List: A of dictionaries containing keys: cidr, start, end.
list of dict: Mapping of networks with start and end positions.
::
[{
'cidr' (str) - The network routing block
'start' (int) - The starting point of the network
'end' (int) - The endpoint point of the network
}]
"""
nets = []
@ -399,10 +457,21 @@ class Whois:
try:
net = copy.deepcopy(BASE_NET)
net['range'] = match.group(2).strip()
net_range = match.group(2).strip()
try:
net['range'] = net['range'] = '{0} - {1}'.format(
ip_network(net_range)[0].__str__(),
ip_network(net_range)[-1].__str__()
) if '/' in net_range else net_range
except ValueError: # pragma: no cover
net['range'] = net_range
temp = []
for addr in match.group(2).strip().split(', '):
for addr in net_range.split(', '):
count = addr.count('.')
if count is not 0 and count < 4:
@ -426,15 +495,33 @@ class Whois:
return nets
def _get_nets_other(self, response):
def _get_nets_lacnic(self, *args, **kwargs):
"""
Deprecated. This will be removed in a future release.
"""
from warnings import warn
warn('Whois._get_nets_lacnic() has been deprecated and will be '
'removed. You should now use Whois.get_nets_lacnic().')
return self.get_nets_lacnic(*args, **kwargs)
def get_nets_other(self, response):
"""
The function for parsing network blocks from generic whois data.
Args:
response: The response from the whois/rwhois server.
response (:obj:`str`): The response from the whois/rwhois server.
Returns:
List: A of dictionaries containing keys: cidr, start, end.
list of dict: Mapping of networks with start and end positions.
::
[{
'cidr' (str) - The network routing block
'start' (int) - The starting point of the network
'end' (int) - The endpoint point of the network
}]
"""
nets = []
@ -451,7 +538,18 @@ class Whois:
try:
net = copy.deepcopy(BASE_NET)
net['range'] = match.group(2)
net_range = match.group(2).strip()
try:
net['range'] = net['range'] = '{0} - {1}'.format(
ip_network(net_range)[0].__str__(),
ip_network(net_range)[-1].__str__()
) if '/' in net_range else net_range
except ValueError: # pragma: no cover
net['range'] = net_range
if match.group(3) and match.group(4):
@ -466,7 +564,7 @@ class Whois:
else:
cidr = ip_network(match.group(2).strip()).__str__()
cidr = ip_network(net_range).__str__()
net['cidr'] = cidr
net['start'] = match.start()
@ -479,6 +577,16 @@ class Whois:
return nets
def _get_nets_other(self, *args, **kwargs):
"""
Deprecated. This will be removed in a future release.
"""
from warnings import warn
warn('Whois._get_nets_other() has been deprecated and will be '
'removed. You should now use Whois.get_nets_other().')
return self.get_nets_other(*args, **kwargs)
def lookup(self, inc_raw=False, retry_count=3, response=None,
get_referral=False, extra_blacklist=None,
ignore_referral_errors=False, asn_data=None,
@ -488,42 +596,60 @@ class Whois:
address via port 43/tcp (WHOIS).
Args:
inc_raw: Boolean for whether to include the raw results in the
returned dictionary.
retry_count: The number of times to retry in case socket errors,
timeouts, connection resets, etc. are encountered.
response: Optional response object, this bypasses the Whois lookup.
get_referral: Boolean for whether to retrieve referral whois
information, if available.
extra_blacklist: A list of blacklisted whois servers in addition to
the global BLACKLIST.
ignore_referral_errors: Boolean for whether to ignore and continue
when an exception is encountered on referral whois lookups.
asn_data: Optional ASN result object, this bypasses the ASN lookup.
field_list: If provided, a list of fields to parse:
['name', 'handle', 'description', 'country', 'state', 'city',
'address', 'postal_code', 'emails', 'created', 'updated']
is_offline: Boolean for whether to perform lookups offline. If
inc_raw (:obj:`bool`, optional): Whether to include the raw
results in the returned dictionary. Defaults to False.
retry_count (:obj:`int`): The number of times to retry in case
socket errors, timeouts, connection resets, etc. are
encountered. Defaults to 3.
response (:obj:`str`): Optional response object, this bypasses the
NIR lookup. Required when is_offline=True.
get_referral (:obj:`bool`): Whether to retrieve referral whois
information, if available. Defaults to False.
extra_blacklist (:obj:`list`): Blacklisted whois servers in
addition to the global BLACKLIST. Defaults to None.
ignore_referral_errors (:obj:`bool`): Whether to ignore and
continue when an exception is encountered on referral whois
lookups. Defaults to False.
asn_data (:obj:`dict`): Result from
:obj:`ipwhois.asn.IPASN.lookup` (required).
field_list (:obj:`list` of :obj:`str`): If provided, fields to
parse. Defaults to:
::
['name', 'handle', 'description', 'country', 'state',
'city', 'address', 'postal_code', 'emails', 'created',
'updated']
is_offline (:obj:`bool`): Whether to perform lookups offline. If
True, response and asn_data must be provided. Primarily used
for testing.
for testing. Defaults to False.
Returns:
Dictionary:
dict: The IP whois lookup results
:query: The IP address (String)
:asn: The Autonomous System Number (String)
:asn_date: The ASN Allocation date (String)
:asn_registry: The assigned ASN registry (String)
:asn_cidr: The assigned ASN CIDR (String)
:asn_country_code: The assigned ASN country code (String)
:nets: Dictionaries containing network information which consists
of the fields listed in the NIC_WHOIS dictionary. (List)
:raw: Raw whois results if the inc_raw parameter is True. (String)
:referral: Dictionary of referral whois information if get_referral
is True and the server isn't blacklisted. Consists of fields
listed in the RWHOIS dictionary.
:raw_referral: Raw referral whois results if the inc_raw parameter
is True. (String)
::
{
'query' (str) - The IP address
'asn' (str) - The Autonomous System Number
'asn_date' (str) - The ASN Allocation date
'asn_registry' (str) - The assigned ASN registry
'asn_cidr' (str) - The assigned ASN CIDR
'asn_country_code' (str) - The assigned ASN country code
'asn_description' (str) - The ASN description
'nets' (list) - Dictionaries containing network
information which consists of the fields listed in the
ipwhois.whois.RIR_WHOIS dictionary.
'raw' (str) - Raw whois results if the inc_raw parameter
is True.
'referral' (dict) - Referral whois information if
get_referral is True and the server is not blacklisted.
Consists of fields listed in the ipwhois.whois.RWHOIS
dictionary.
'raw_referral' (str) - Raw referral whois results if the
inc_raw parameter is True.
}
"""
# Create the return dictionary.
@ -614,7 +740,7 @@ class Whois:
results['raw_referral'] = response_ref
temp_rnet = self._parse_fields(
temp_rnet = self.parse_fields(
response_ref,
RWHOIS['fields'],
field_list=field_list
@ -632,15 +758,15 @@ class Whois:
if asn_data['asn_registry'] == 'arin':
nets_response = self._get_nets_arin(response)
nets_response = self.get_nets_arin(response)
elif asn_data['asn_registry'] == 'lacnic':
nets_response = self._get_nets_lacnic(response)
nets_response = self.get_nets_lacnic(response)
else:
nets_response = self._get_nets_other(response)
nets_response = self.get_nets_other(response)
nets.extend(nets_response)
@ -662,7 +788,7 @@ class Whois:
dt_format = None
temp_net = self._parse_fields(
temp_net = self.parse_fields(
response,
RIR_WHOIS[asn_data['asn_registry']]['fields'],
section_end,