Include posters in Twitter notifications

* Also cleanup Facebook
This commit is contained in:
JonnyWong16 2016-05-15 00:03:45 -07:00
commit acc18b8d68
25 changed files with 6970 additions and 4745 deletions

56
lib/twitter/__init__.py Normal file
View file

@ -0,0 +1,56 @@
#!/usr/bin/env python
#
# vim: sw=2 ts=2 sts=2
#
# Copyright 2007 The Python-Twitter Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A library that provides a Python interface to the Twitter API"""
from __future__ import absolute_import
__author__ = 'The Python-Twitter Developers'
__email__ = 'python-twitter@googlegroups.com'
__copyright__ = 'Copyright (c) 2007-2016 The Python-Twitter Developers'
__license__ = 'Apache License 2.0'
__version__ = '3.0rc1'
__url__ = 'https://github.com/bear/python-twitter'
__download_url__ = 'https://pypi.python.org/pypi/python-twitter'
__description__ = 'A Python wrapper around the Twitter API'
import json # noqa
try:
from hashlib import md5 # noqa
except ImportError:
from md5 import md5 # noqa
from ._file_cache import _FileCache # noqa
from .error import TwitterError # noqa
from .parse_tweet import ParseTweet # noqa
from .models import ( # noqa
Category, # noqa
DirectMessage, # noqa
Hashtag, # noqa
List, # noqa
Media, # noqa
Trend, # noqa
Url, # noqa
User, # noqa
UserStatus, # noqa
Status # noqa
)
from .api import Api # noqa

161
lib/twitter/_file_cache.py Normal file
View file

@ -0,0 +1,161 @@
#!/usr/bin/env python
import errno
import os
import re
import tempfile
from hashlib import md5
class _FileCacheError(Exception):
"""Base exception class for FileCache related errors"""
class _FileCache(object):
DEPTH = 3
def __init__(self, root_directory=None):
self._InitializeRootDirectory(root_directory)
def Get(self, key):
path = self._GetPath(key)
if os.path.exists(path):
with open(path) as f:
return f.read()
else:
return None
def Set(self, key, data):
path = self._GetPath(key)
directory = os.path.dirname(path)
if not os.path.exists(directory):
os.makedirs(directory)
if not os.path.isdir(directory):
raise _FileCacheError('%s exists but is not a directory' % directory)
temp_fd, temp_path = tempfile.mkstemp()
temp_fp = os.fdopen(temp_fd, 'w')
temp_fp.write(data)
temp_fp.close()
if not path.startswith(self._root_directory):
raise _FileCacheError('%s does not appear to live under %s' %
(path, self._root_directory))
if os.path.exists(path):
os.remove(path)
os.rename(temp_path, path)
def Remove(self, key):
path = self._GetPath(key)
if not path.startswith(self._root_directory):
raise _FileCacheError('%s does not appear to live under %s' %
(path, self._root_directory ))
if os.path.exists(path):
os.remove(path)
def GetCachedTime(self, key):
path = self._GetPath(key)
if os.path.exists(path):
return os.path.getmtime(path)
else:
return None
def _GetUsername(self):
"""Attempt to find the username in a cross-platform fashion."""
try:
return os.getenv('USER') or \
os.getenv('LOGNAME') or \
os.getenv('USERNAME') or \
os.getlogin() or \
'nobody'
except (AttributeError, IOError, OSError):
return 'nobody'
def _GetTmpCachePath(self):
username = self._GetUsername()
cache_directory = 'python.cache_' + username
return os.path.join(tempfile.gettempdir(), cache_directory)
def _InitializeRootDirectory(self, root_directory):
if not root_directory:
root_directory = self._GetTmpCachePath()
root_directory = os.path.abspath(root_directory)
try:
os.mkdir(root_directory)
except OSError as e:
if e.errno == errno.EEXIST and os.path.isdir(root_directory):
# directory already exists
pass
else:
# exists but is a file, or no permissions, or...
raise
self._root_directory = root_directory
def _GetPath(self, key):
try:
hashed_key = md5(key.encode('utf-8')).hexdigest()
except TypeError:
hashed_key = md5.new(key).hexdigest()
return os.path.join(self._root_directory,
self._GetPrefix(hashed_key),
hashed_key)
def _GetPrefix(self, hashed_key):
return os.path.sep.join(hashed_key[0:_FileCache.DEPTH])
class ParseTweet(object):
# compile once on import
regexp = {"RT": "^RT", "MT": r"^MT", "ALNUM": r"(@[a-zA-Z0-9_]+)",
"HASHTAG": r"(#[\w\d]+)", "URL": r"([http://]?[a-zA-Z\d\/]+[\.]+[a-zA-Z\d\/\.]+)"}
regexp = dict((key, re.compile(value)) for key, value in list(regexp.items()))
def __init__(self, timeline_owner, tweet):
""" timeline_owner : twitter handle of user account. tweet - 140 chars from feed; object does all computation on construction
properties:
RT, MT - boolean
URLs - list of URL
Hashtags - list of tags
"""
self.Owner = timeline_owner
self.tweet = tweet
self.UserHandles = ParseTweet.getUserHandles(tweet)
self.Hashtags = ParseTweet.getHashtags(tweet)
self.URLs = ParseTweet.getURLs(tweet)
self.RT = ParseTweet.getAttributeRT(tweet)
self.MT = ParseTweet.getAttributeMT(tweet)
# additional intelligence
if ( self.RT and len(self.UserHandles) > 0 ): # change the owner of tweet?
self.Owner = self.UserHandles[0]
return
def __str__(self):
""" for display method """
return "owner %s, urls: %d, hashtags %d, user_handles %d, len_tweet %d, RT = %s, MT = %s" % (
self.Owner, len(self.URLs), len(self.Hashtags), len(self.UserHandles),
len(self.tweet), self.RT, self.MT)
@staticmethod
def getAttributeRT(tweet):
""" see if tweet is a RT """
return re.search(ParseTweet.regexp["RT"], tweet.strip()) is not None
@staticmethod
def getAttributeMT(tweet):
""" see if tweet is a MT """
return re.search(ParseTweet.regexp["MT"], tweet.strip()) is not None
@staticmethod
def getUserHandles(tweet):
""" given a tweet we try and extract all user handles in order of occurrence"""
return re.findall(ParseTweet.regexp["ALNUM"], tweet)
@staticmethod
def getHashtags(tweet):
""" return all hashtags"""
return re.findall(ParseTweet.regexp["HASHTAG"], tweet)
@staticmethod
def getURLs(tweet):
""" URL : [http://]?[\w\.?/]+"""
return re.findall(ParseTweet.regexp["URL"], tweet)

4534
lib/twitter/api.py Normal file

File diff suppressed because it is too large Load diff

10
lib/twitter/error.py Normal file
View file

@ -0,0 +1,10 @@
#!/usr/bin/env python
class TwitterError(Exception):
"""Base class for Twitter errors"""
@property
def message(self):
'''Returns the first argument used to construct this error.'''
return self.args[0]

476
lib/twitter/models.py Normal file
View file

@ -0,0 +1,476 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import json
from calendar import timegm
try:
from rfc822 import parsedate
except ImportError:
from email.utils import parsedate
class TwitterModel(object):
""" Base class from which all twitter models will inherit. """
def __init__(self, **kwargs):
self.param_defaults = {}
def __str__(self):
""" Returns a string representation of TwitterModel. By default
this is the same as AsJsonString(). """
return self.AsJsonString()
def __eq__(self, other):
return other and self.AsDict() == other.AsDict()
def __ne__(self, other):
return not self.__eq__(other)
def AsJsonString(self):
""" Returns the TwitterModel as a JSON string based on key/value
pairs returned from the AsDict() method. """
return json.dumps(self.AsDict(), sort_keys=True)
def AsDict(self):
""" Create a dictionary representation of the object. Please see inline
comments on construction when dictionaries contain TwitterModels. """
data = {}
for (key, value) in self.param_defaults.items():
# If the value is a list, we need to create a list to hold the
# dicts created by an object supporting the AsDict() method,
# i.e., if it inherits from TwitterModel. If the item in the list
# doesn't support the AsDict() method, then we assign the value
# directly. An example being a list of Media objects contained
# within a Status object.
if isinstance(getattr(self, key, None), (list, tuple, set)):
data[key] = list()
for subobj in getattr(self, key, None):
if getattr(subobj, 'AsDict', None):
data[key].append(subobj.AsDict())
else:
data[key].append(subobj)
# Not a list, *but still a subclass of TwitterModel* and
# and we can assign the data[key] directly with the AsDict()
# method of the object. An example being a Status object contained
# within a User object.
elif getattr(getattr(self, key, None), 'AsDict', None):
data[key] = getattr(self, key).AsDict()
# If the value doesn't have an AsDict() method, i.e., it's not
# something that subclasses TwitterModel, then we can use direct
# assigment.
elif getattr(self, key, None):
data[key] = getattr(self, key, None)
return data
@classmethod
def NewFromJsonDict(cls, data, **kwargs):
""" Create a new instance based on a JSON dict. Any kwargs should be
supplied by the inherited, calling class.
Args:
data: A JSON dict, as converted from the JSON in the twitter API.
"""
if kwargs:
for key, val in kwargs.items():
data[key] = val
return cls(**data)
class Media(TwitterModel):
"""A class representing the Media component of a tweet. """
def __init__(self, **kwargs):
self.param_defaults = {
'display_url': None,
'expanded_url': None,
'id': None,
'media_url': None,
'media_url_https': None,
'type': None,
'url': None,
}
for (param, default) in self.param_defaults.items():
setattr(self, param, kwargs.get(param, default))
def __repr__(self):
return "Media(ID={media_id}, Type={media_type}, DisplayURL='{url}')".format(
media_id=self.id,
media_type=self.type,
url=self.display_url)
class List(TwitterModel):
"""A class representing the List structure used by the twitter API. """
def __init__(self, **kwargs):
self.param_defaults = {
'description': None,
'following': None,
'full_name': None,
'id': None,
'member_count': None,
'mode': None,
'name': None,
'slug': None,
'subscriber_count': None,
'uri': None,
'user': None,
}
for (param, default) in self.param_defaults.items():
setattr(self, param, kwargs.get(param, default))
if 'user' in kwargs:
self.user = User.NewFromJsonDict(kwargs.get('user'))
def __repr__(self):
return "List(ID={list_id}, FullName={full_name!r}, Slug={slug}, User={user})".format(
list_id=self.id,
full_name=self.full_name,
slug=self.slug,
user=self.user.screen_name)
class Category(TwitterModel):
"""A class representing the suggested user category structure. """
def __init__(self, **kwargs):
self.param_defaults = {
'name': None,
'size': None,
'slug': None,
}
for (param, default) in self.param_defaults.items():
setattr(self, param, kwargs.get(param, default))
def __repr__(self):
return "Category(Name={name!r}, Slug={slug}, Size={size})".format(
name=self.name,
slug=self.slug,
size=self.size)
class DirectMessage(TwitterModel):
"""A class representing a Direct Message. """
def __init__(self, **kwargs):
self.param_defaults = {
'created_at': None,
'id': None,
'recipient_id': None,
'recipient_screen_name': None,
'sender_id': None,
'sender_screen_name': None,
'text': None,
}
for (param, default) in self.param_defaults.items():
setattr(self, param, kwargs.get(param, default))
def __repr__(self):
if self.text and len(self.text) > 140:
text = "{text}[...]".format(text=self.text[:140])
else:
text = self.text
return "DirectMessage(ID={dm_id}, Sender={sender}, Created={time}, Text='{text!r}')".format(
dm_id=self.id,
sender=self.sender_screen_name,
time=self.created_at,
text=text)
class Trend(TwitterModel):
""" A class representing a trending topic. """
def __init__(self, **kwargs):
self.param_defaults = {
'events': None,
'name': None,
'promoted_content': None,
'query': None,
'timestamp': None,
'url': None,
'volume': None,
}
for (param, default) in self.param_defaults.items():
setattr(self, param, kwargs.get(param, default))
def __repr__(self):
return "Trend(Name={0!r}, Time={1}, URL={2})".format(
self.name,
self.timestamp,
self.url)
class Hashtag(TwitterModel):
""" A class representing a twitter hashtag. """
def __init__(self, **kwargs):
self.param_defaults = {
'text': None
}
for (param, default) in self.param_defaults.items():
setattr(self, param, kwargs.get(param, default))
def __repr__(self):
return "Hashtag(Text={text!r})".format(
text=self.text)
class Url(TwitterModel):
""" A class representing an URL contained in a tweet. """
def __init__(self, **kwargs):
self.param_defaults = {
'expanded_url': None,
'url': None}
for (param, default) in self.param_defaults.items():
setattr(self, param, kwargs.get(param, default))
def __repr__(self):
return "URL(URL={url}, ExpandedURL={eurl})".format(
url=self.url,
eurl=self.expanded_url)
class UserStatus(TwitterModel):
""" A class representing the UserStatus structure. This is an abbreviated
form of the twitter.User object. """
connections = {'following': False,
'followed_by': False,
'following_received': False,
'following_requested': False,
'blocking': False,
'muting': False}
def __init__(self, **kwargs):
self.param_defaults = {
'blocking': False,
'followed_by': False,
'following': False,
'following_received': False,
'following_requested': False,
'id': None,
'id_str': None,
'muting': False,
'name': None,
'screen_name': None,
}
for (param, default) in self.param_defaults.items():
setattr(self, param, kwargs.get(param, default))
if 'connections' in kwargs:
for param in self.connections:
if param in kwargs['connections']:
setattr(self, param, True)
def __repr__(self):
connections = [param for param in self.connections if getattr(self, param)]
return "UserStatus(ID={uid}, ScreenName={sn}, Connections=[{conn}])".format(
uid=self.id,
sn=self.screen_name,
conn=", ".join(connections))
class User(TwitterModel):
"""A class representing the User structure. """
def __init__(self, **kwargs):
self.param_defaults = {
'contributors_enabled': None,
'created_at': None,
'default_profile': None,
'default_profile_image': None,
'description': None,
'favourites_count': None,
'followers_count': None,
'friends_count': None,
'geo_enabled': None,
'id': None,
'lang': None,
'listed_count': None,
'location': None,
'name': None,
'notifications': None,
'profile_background_color': None,
'profile_background_image_url': None,
'profile_background_tile': None,
'profile_banner_url': None,
'profile_image_url': None,
'profile_link_color': None,
'profile_sidebar_fill_color': None,
'profile_text_color': None,
'protected': None,
'screen_name': None,
'status': None,
'statuses_count': None,
'time_zone': None,
'url': None,
'utc_offset': None,
'verified': None,
}
for (param, default) in self.param_defaults.items():
setattr(self, param, kwargs.get(param, default))
def __repr__(self):
return "User(ID={uid}, ScreenName={sn})".format(
uid=self.id,
sn=self.screen_name)
@classmethod
def NewFromJsonDict(cls, data, **kwargs):
from twitter import Status
if data.get('status', None):
status = Status.NewFromJsonDict(data.get('status'))
return super(cls, cls).NewFromJsonDict(data=data, status=status)
else:
return super(cls, cls).NewFromJsonDict(data=data)
class Status(TwitterModel):
"""A class representing the Status structure used by the twitter API.
"""
def __init__(self, **kwargs):
self.param_defaults = {
'contributors': None,
'coordinates': None,
'created_at': None,
'current_user_retweet': None,
'favorite_count': None,
'favorited': None,
'geo': None,
'hashtags': None,
'id': None,
'id_str': None,
'in_reply_to_screen_name': None,
'in_reply_to_status_id': None,
'in_reply_to_user_id': None,
'lang': None,
'location': None,
'media': None,
'place': None,
'possibly_sensitive': None,
'retweet_count': None,
'retweeted': None,
'retweeted_status': None,
'scopes': None,
'source': None,
'text': None,
'truncated': None,
'urls': None,
'user': None,
'user_mentions': None,
'withheld_copyright': None,
'withheld_in_countries': None,
'withheld_scope': None,
}
for (param, default) in self.param_defaults.items():
setattr(self, param, kwargs.get(param, default))
@property
def created_at_in_seconds(self):
""" Get the time this status message was posted, in seconds since
the epoch (1 Jan 1970).
Returns:
int: The time this status message was posted, in seconds since
the epoch.
"""
return timegm(parsedate(self.created_at))
def __repr__(self):
""" A string representation of this twitter.Status instance.
The return value is the ID of status, username and datetime.
Returns:
string: A string representation of this twitter.Status instance with
the ID of status, username and datetime.
"""
if self.user:
return "Status(ID={0}, ScreenName={1}, Created={2}, Text={3!r})".format(
self.id,
self.user.screen_name,
self.created_at,
self.text)
else:
return u"Status(ID={0}, Created={1}, Text={2!r})".format(
self.id,
self.created_at,
self.text)
@classmethod
def NewFromJsonDict(cls, data, **kwargs):
""" Create a new instance based on a JSON dict.
Args:
data: A JSON dict, as converted from the JSON in the twitter API
Returns:
A twitter.Status instance
"""
current_user_retweet = None
hashtags = None
media = None
retweeted_status = None
urls = None
user = None
user_mentions = None
if 'user' in data:
user = User.NewFromJsonDict(data['user'])
if 'retweeted_status' in data:
retweeted_status = Status.NewFromJsonDict(data['retweeted_status'])
if 'current_user_retweet' in data:
current_user_retweet = data['current_user_retweet']['id']
if 'entities' in data:
if 'urls' in data['entities']:
urls = [Url.NewFromJsonDict(u) for u in data['entities']['urls']]
if 'user_mentions' in data['entities']:
user_mentions = [User.NewFromJsonDict(u) for u in data['entities']['user_mentions']]
if 'hashtags' in data['entities']:
hashtags = [Hashtag.NewFromJsonDict(h) for h in data['entities']['hashtags']]
if 'media' in data['entities']:
media = [Media.NewFromJsonDict(m) for m in data['entities']['media']]
# the new extended entities
if 'extended_entities' in data:
if 'media' in data['extended_entities']:
media = [Media.NewFromJsonDict(m) for m in data['extended_entities']['media']]
return super(cls, cls).NewFromJsonDict(data=data,
current_user_retweet=current_user_retweet,
hashtags=hashtags,
media=media,
retweeted_status=retweeted_status,
urls=urls,
user=user,
user_mentions=user_mentions)

View file

@ -0,0 +1,98 @@
#!/usr/bin/env python
import re
class Emoticons:
POSITIVE = ["*O", "*-*", "*O*", "*o*", "* *",
":P", ":D", ":d", ":p",
";P", ";D", ";d", ";p",
":-)", ";-)", ":=)", ";=)",
":<)", ":>)", ";>)", ";=)",
"=}", ":)", "(:;)",
"(;", ":}", "{:", ";}",
"{;:]",
"[;", ":')", ";')", ":-3",
"{;", ":]",
";-3", ":-x", ";-x", ":-X",
";-X", ":-}", ";-=}", ":-]",
";-]", ":-.)",
"^_^", "^-^"]
NEGATIVE = [":(", ";(", ":'(",
"=(", "={", "):", ");",
")':", ")';", ")=", "}=",
";-{{", ";-{", ":-{{", ":-{",
":-(", ";-(",
":,)", ":'{",
"[:", ";]"
]
class ParseTweet(object):
# compile once on import
regexp = {"RT": "^RT", "MT": r"^MT", "ALNUM": r"(@[a-zA-Z0-9_]+)",
"HASHTAG": r"(#[\w\d]+)", "URL": r"([https://|http://]?[a-zA-Z\d\/]+[\.]+[a-zA-Z\d\/\.]+)",
"SPACES": r"\s+"}
regexp = dict((key, re.compile(value)) for key, value in regexp.items())
def __init__(self, timeline_owner, tweet):
""" timeline_owner : twitter handle of user account. tweet - 140 chars from feed; object does all computation on construction
properties:
RT, MT - boolean
URLs - list of URL
Hashtags - list of tags
"""
self.Owner = timeline_owner
self.tweet = tweet
self.UserHandles = ParseTweet.getUserHandles(tweet)
self.Hashtags = ParseTweet.getHashtags(tweet)
self.URLs = ParseTweet.getURLs(tweet)
self.RT = ParseTweet.getAttributeRT(tweet)
self.MT = ParseTweet.getAttributeMT(tweet)
self.Emoticon = ParseTweet.getAttributeEmoticon(tweet)
# additional intelligence
if ( self.RT and len(self.UserHandles) > 0 ): # change the owner of tweet?
self.Owner = self.UserHandles[0]
return
def __str__(self):
""" for display method """
return "owner %s, urls: %d, hashtags %d, user_handles %d, len_tweet %d, RT = %s, MT = %s" % \
(self.Owner, len(self.URLs), len(self.Hashtags), len(self.UserHandles), len(self.tweet), self.RT, self.MT)
@staticmethod
def getAttributeEmoticon(tweet):
""" see if tweet is contains any emoticons, +ve, -ve or neutral """
emoji = list()
for tok in re.split(ParseTweet.regexp["SPACES"], tweet.strip()):
if tok in Emoticons.POSITIVE:
emoji.append( tok )
continue
if tok in Emoticons.NEGATIVE:
emoji.append( tok )
return emoji
@staticmethod
def getAttributeRT(tweet):
""" see if tweet is a RT """
return re.search(ParseTweet.regexp["RT"], tweet.strip()) is not None
@staticmethod
def getAttributeMT(tweet):
""" see if tweet is a MT """
return re.search(ParseTweet.regexp["MT"], tweet.strip()) is not None
@staticmethod
def getUserHandles(tweet):
""" given a tweet we try and extract all user handles in order of occurrence"""
return re.findall(ParseTweet.regexp["ALNUM"], tweet)
@staticmethod
def getHashtags(tweet):
""" return all hashtags"""
return re.findall(ParseTweet.regexp["HASHTAG"], tweet)
@staticmethod
def getURLs(tweet):
""" URL : [http://]?[\w\.?/]+"""
return re.findall(ParseTweet.regexp["URL"], tweet)

215
lib/twitter/ratelimit.py Normal file
View file

@ -0,0 +1,215 @@
from collections import namedtuple
import re
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
from twitter.twitter_utils import enf_type
EndpointRateLimit = namedtuple('EndpointRateLimit',
['limit', 'remaining', 'reset'])
ResourceEndpoint = namedtuple('ResourceEndpoint', ['regex', 'resource'])
GEO_ID_PLACE_ID = ResourceEndpoint(re.compile(r'/geo/id/\d+'), "/geo/id/:place_id")
SAVED_SEARCHES_DESTROY_ID = ResourceEndpoint(re.compile(r'/saved_searches/destroy/\d+'), "/saved_searches/destroy/:id")
SAVED_SEARCHES_SHOW_ID = ResourceEndpoint(re.compile(r'/saved_searches/show/\d+'), "/saved_searches/show/:id")
STATUSES_RETWEETS_ID = ResourceEndpoint(re.compile(r'/statuses/retweets/\d+'), "/statuses/retweets/:id")
STATUSES_SHOW_ID = ResourceEndpoint(re.compile(r'/statuses/show'), "/statuses/show/:id")
USERS_SHOW_ID = ResourceEndpoint(re.compile(r'/users/show'), "/users/show/:id")
USERS_SUGGESTIONS_SLUG = ResourceEndpoint(re.compile(r'/users/suggestions/\w+$'), "/users/suggestions/:slug")
USERS_SUGGESTIONS_SLUG_MEMBERS = ResourceEndpoint(re.compile(r'/users/suggestions/.+/members'), "/users/suggestions/:slug/members")
NON_STANDARD_ENDPOINTS = [
GEO_ID_PLACE_ID,
SAVED_SEARCHES_DESTROY_ID,
SAVED_SEARCHES_SHOW_ID,
STATUSES_RETWEETS_ID,
STATUSES_SHOW_ID,
USERS_SHOW_ID,
USERS_SUGGESTIONS_SLUG,
USERS_SUGGESTIONS_SLUG_MEMBERS,
]
class RateLimit(object):
""" Object to hold the rate limit status of various endpoints for
the twitter.Api object.
This object is generally attached to the API as Api.rate_limit, but is not
created until the user makes a method call that uses _RequestUrl() or calls
Api.InitializeRateLimit(), after which it get created and populated with
rate limit data from Twitter.
Calling Api.InitializeRateLimit() populates the object with all of the
rate limits for the endpoints defined by Twitter; more info is available
here:
https://dev.twitter.com/rest/public/rate-limits
https://dev.twitter.com/rest/public/rate-limiting
https://dev.twitter.com/rest/reference/get/application/rate_limit_status
Once a resource (i.e., an endpoint) has been requested, Twitter's response
will contain the current rate limit status as part of the headers, i.e.::
x-rate-limit-limit
x-rate-limit-remaining
x-rate-limit-reset
``limit`` is the generic limit for that endpoint, ``remaining`` is how many
more times you can make a call to that endpoint, and ``reset`` is the time
(in seconds since the epoch) until remaining resets to its default for that
endpoint.
Generally speaking, each endpoint has a 15-minute reset time and endpoints
can either make 180 or 15 requests per window. According to Twitter, any
endpoint not defined in the rate limit chart or the response from a GET
request to ``application/rate_limit_status.json`` should be assumed to be
15 requests per 15 minutes.
"""
def __init__(self, **kwargs):
""" Instantiates the RateLimitObject. Takes a json dict as
kwargs and maps to the object's dictionary. So for something like:
{"resources": {
"help": {
/help/privacy": {
"limit": 15,
"remaining": 15,
"reset": 1452254278
}
}
}
}
the RateLimit object will have an attribute 'resources' from which you
can perform a lookup like:
api.rate_limit.get('help').get('/help/privacy')
and a dictionary of limit, remaining, and reset will be returned.
"""
self.__dict__.update(kwargs)
@staticmethod
def url_to_resource(url):
""" Take a fully qualified URL and attempts to return the rate limit
resource family corresponding to it. For example:
>>> RateLimit.url_to_resource('https://api.twitter.com/1.1/statuses/lookup.json?id=317')
>>> '/statuses/lookup'
Args:
url (str): URL to convert to a resource family.
Returns:
string: Resource family corresponding to the URL.
"""
resource = urlparse(url).path.replace('/1.1', '').replace('.json', '')
for non_std_endpoint in NON_STANDARD_ENDPOINTS:
if re.match(non_std_endpoint.regex, resource):
return non_std_endpoint.resource
else:
return resource
def set_unknown_limit(self, url, limit, remaining, reset):
""" If a resource family is unknown, add it to the object's
dictionary. This is to deal with new endpoints being added to
the API, but not necessarily to the information returned by
``/account/rate_limit_status.json`` endpoint.
For example, if Twitter were to add an endpoint
``/puppies/lookup.json``, the RateLimit object would create a resource
family ``puppies`` and add ``/puppies/lookup`` as the endpoint, along
with whatever limit, remaining hits available, and reset time would be
applicable to that resource+endpoint pair.
Args:
url (str):
URL of the endpoint being fetched.
limit (int):
Max number of times a user or app can hit the endpoint
before being rate limited.
remaining (int):
Number of times a user or app can access the endpoint
before being rate limited.
reset (int):
Epoch time at which the rate limit window will reset.
"""
endpoint = self.url_to_resource(url)
resource_family = endpoint.split('/')[1]
self.__dict__['resources'].update(
{resource_family: {
endpoint: {
"limit": limit,
"remaining": remaining,
"reset": reset
}}})
def get_limit(self, url):
""" Gets a EndpointRateLimit object for the given url.
Args:
url (str, optional):
URL of the endpoint for which to return the rate limit
status.
Returns:
namedtuple: EndpointRateLimit object containing rate limit
information.
"""
endpoint = self.url_to_resource(url)
resource_family = endpoint.split('/')[1]
try:
family_rates = self.resources.get(resource_family).get(endpoint)
except AttributeError:
return EndpointRateLimit(limit=15, remaining=15, reset=0)
if not family_rates:
self.set_unknown_limit(url, limit=15, remaining=15, reset=0)
return EndpointRateLimit(limit=15, remaining=15, reset=0)
return EndpointRateLimit(family_rates['limit'],
family_rates['remaining'],
family_rates['reset'])
def set_limit(self, url, limit, remaining, reset):
""" Set an endpoint's rate limits. The data used for each of the
args should come from Twitter's ``x-rate-limit`` headers.
Args:
url (str):
URL of the endpoint being fetched.
limit (int):
Max number of times a user or app can hit the endpoint
before being rate limited.
remaining (int):
Number of times a user or app can access the endpoint
before being rate limited.
reset (int):
Epoch time at which the rate limit window will reset.
"""
endpoint = self.url_to_resource(url)
resource_family = endpoint.split('/')[1]
try:
family_rates = self.resources.get(resource_family).get(endpoint)
except AttributeError:
self.set_unknown_limit(url, limit, remaining, reset)
family_rates = self.resources.get(resource_family).get(endpoint)
family_rates['limit'] = enf_type('limit', int, limit)
family_rates['remaining'] = enf_type('remaining', int, remaining)
family_rates['reset'] = enf_type('reset', int, reset)
return EndpointRateLimit(family_rates['limit'],
family_rates['remaining'],
family_rates['reset'])

View file

@ -0,0 +1,265 @@
# encoding: utf-8
import mimetypes
import os
import re
import requests
from tempfile import NamedTemporaryFile
from twitter import TwitterError
TLDS = [
"ac", "ad", "ae", "af", "ag", "ai", "al", "am", "an", "ao", "aq", "ar",
"as", "at", "au", "aw", "ax", "az", "ba", "bb", "bd", "be", "bf", "bg",
"bh", "bi", "bj", "bl", "bm", "bn", "bo", "bq", "br", "bs", "bt", "bv",
"bw", "by", "bz", "ca", "cc", "cd", "cf", "cg", "ch", "ci", "ck", "cl",
"cm", "cn", "co", "cr", "cu", "cv", "cw", "cx", "cy", "cz", "de", "dj",
"dk", "dm", "do", "dz", "ec", "ee", "eg", "eh", "er", "es", "et", "eu",
"fi", "fj", "fk", "fm", "fo", "fr", "ga", "gb", "gd", "ge", "gf", "gg",
"gh", "gi", "gl", "gm", "gn", "gp", "gq", "gr", "gs", "gt", "gu", "gw",
"gy", "hk", "hm", "hn", "hr", "ht", "hu", "id", "ie", "il", "im", "in",
"io", "iq", "ir", "is", "it", "je", "jm", "jo", "jp", "ke", "kg", "kh",
"ki", "km", "kn", "kp", "kr", "kw", "ky", "kz", "la", "lb", "lc", "li",
"lk", "lr", "ls", "lt", "lu", "lv", "ly", "ma", "mc", "md", "me", "mf",
"mg", "mh", "mk", "ml", "mm", "mn", "mo", "mp", "mq", "mr", "ms", "mt",
"mu", "mv", "mw", "mx", "my", "mz", "na", "nc", "ne", "nf", "ng", "ni",
"nl", "no", "np", "nr", "nu", "nz", "om", "pa", "pe", "pf", "pg", "ph",
"pk", "pl", "pm", "pn", "pr", "ps", "pt", "pw", "py", "qa", "re", "ro",
"rs", "ru", "rw", "sa", "sb", "sc", "sd", "se", "sg", "sh", "si", "sj",
"sk", "sl", "sm", "sn", "so", "sr", "ss", "st", "su", "sv", "sx", "sy",
"sz", "tc", "td", "tf", "tg", "th", "tj", "tk", "tl", "tm", "tn", "to",
"tp", "tr", "tt", "tv", "tw", "tz", "ua", "ug", "uk", "um", "us", "uy",
"uz", "va", "vc", "ve", "vg", "vi", "vn", "vu", "wf", "ws", "ye", "yt",
"za", "zm", "zw", "ελ", "бел", "мкд", "мон", "рф", "срб", "укр", "қаз",
"հայ", "الاردن", "الجزائر", "السعودية", "المغرب", "امارات", "ایران", "بھارت",
"تونس", "سودان", "سورية", "عراق", "عمان", "فلسطين", "قطر", "مصر",
"مليسيا", "پاکستان", "भारत", "বাংলা", "ভারত", "ਭਾਰਤ", "ભારત",
"இந்தியா", "இலங்கை", "சிங்கப்பூர்", "భారత్", "ලංකා", "ไทย",
"გე", "中国", "中國", "台湾", "台灣", "新加坡", "澳門", "香港", "한국", "neric:",
"abb", "abbott", "abogado", "academy", "accenture", "accountant",
"accountants", "aco", "active", "actor", "ads", "adult", "aeg", "aero",
"afl", "agency", "aig", "airforce", "airtel", "allfinanz", "alsace",
"amsterdam", "android", "apartments", "app", "aquarelle", "archi", "army",
"arpa", "asia", "associates", "attorney", "auction", "audio", "auto",
"autos", "axa", "azure", "band", "bank", "bar", "barcelona", "barclaycard",
"barclays", "bargains", "bauhaus", "bayern", "bbc", "bbva", "bcn", "beer",
"bentley", "berlin", "best", "bet", "bharti", "bible", "bid", "bike",
"bing", "bingo", "bio", "biz", "black", "blackfriday", "bloomberg", "blue",
"bmw", "bnl", "bnpparibas", "boats", "bond", "boo", "boots", "boutique",
"bradesco", "bridgestone", "broker", "brother", "brussels", "budapest",
"build", "builders", "business", "buzz", "bzh", "cab", "cafe", "cal",
"camera", "camp", "cancerresearch", "canon", "capetown", "capital",
"caravan", "cards", "care", "career", "careers", "cars", "cartier",
"casa", "cash", "casino", "cat", "catering", "cba", "cbn", "ceb", "center",
"ceo", "cern", "cfa", "cfd", "chanel", "channel", "chat", "cheap",
"chloe", "christmas", "chrome", "church", "cisco", "citic", "city",
"claims", "cleaning", "click", "clinic", "clothing", "cloud", "club",
"coach", "codes", "coffee", "college", "cologne", "com", "commbank",
"community", "company", "computer", "condos", "construction", "consulting",
"contractors", "cooking", "cool", "coop", "corsica", "country", "coupons",
"courses", "credit", "creditcard", "cricket", "crown", "crs", "cruises",
"cuisinella", "cymru", "cyou", "dabur", "dad", "dance", "date", "dating",
"datsun", "day", "dclk", "deals", "degree", "delivery", "delta",
"democrat", "dental", "dentist", "desi", "design", "dev", "diamonds",
"diet", "digital", "direct", "directory", "discount", "dnp", "docs",
"dog", "doha", "domains", "doosan", "download", "drive", "durban", "dvag",
"earth", "eat", "edu", "education", "email", "emerck", "energy",
"engineer", "engineering", "enterprises", "epson", "equipment", "erni",
"esq", "estate", "eurovision", "eus", "events", "everbank", "exchange",
"expert", "exposed", "express", "fage", "fail", "faith", "family", "fan",
"fans", "farm", "fashion", "feedback", "film", "finance", "financial",
"firmdale", "fish", "fishing", "fit", "fitness", "flights", "florist",
"flowers", "flsmidth", "fly", "foo", "football", "forex", "forsale",
"forum", "foundation", "frl", "frogans", "fund", "furniture", "futbol",
"fyi", "gal", "gallery", "game", "garden", "gbiz", "gdn", "gent",
"genting", "ggee", "gift", "gifts", "gives", "giving", "glass", "gle",
"global", "globo", "gmail", "gmo", "gmx", "gold", "goldpoint", "golf",
"goo", "goog", "google", "gop", "gov", "graphics", "gratis", "green",
"gripe", "group", "guge", "guide", "guitars", "guru", "hamburg", "hangout",
"haus", "healthcare", "help", "here", "hermes", "hiphop", "hitachi", "hiv",
"hockey", "holdings", "holiday", "homedepot", "homes", "honda", "horse",
"host", "hosting", "hoteles", "hotmail", "house", "how", "hsbc", "ibm",
"icbc", "ice", "icu", "ifm", "iinet", "immo", "immobilien", "industries",
"infiniti", "info", "ing", "ink", "institute", "insure", "int",
"international", "investments", "ipiranga", "irish", "ist", "istanbul",
"itau", "iwc", "java", "jcb", "jetzt", "jewelry", "jlc", "jll", "jobs",
"joburg", "jprs", "juegos", "kaufen", "kddi", "kim", "kitchen", "kiwi",
"koeln", "komatsu", "krd", "kred", "kyoto", "lacaixa", "lancaster", "land",
"lasalle", "lat", "latrobe", "law", "lawyer", "lds", "lease", "leclerc",
"legal", "lexus", "lgbt", "liaison", "lidl", "life", "lighting", "limited",
"limo", "link", "live", "lixil", "loan", "loans", "lol", "london", "lotte",
"lotto", "love", "ltda", "lupin", "luxe", "luxury", "madrid", "maif",
"maison", "man", "management", "mango", "market", "marketing", "markets",
"marriott", "mba", "media", "meet", "melbourne", "meme", "memorial", "men",
"menu", "miami", "microsoft", "mil", "mini", "mma", "mobi", "moda", "moe",
"mom", "monash", "money", "montblanc", "mormon", "mortgage", "moscow",
"motorcycles", "mov", "movie", "movistar", "mtn", "mtpc", "museum",
"nadex", "nagoya", "name", "navy", "nec", "net", "netbank", "network",
"neustar", "new", "news", "nexus", "ngo", "nhk", "nico", "ninja", "nissan",
"nokia", "nra", "nrw", "ntt", "nyc", "office", "okinawa", "omega", "one",
"ong", "onl", "online", "ooo", "oracle", "orange", "org", "organic",
"osaka", "otsuka", "ovh", "page", "panerai", "paris", "partners", "parts",
"party", "pet", "pharmacy", "philips", "photo", "photography", "photos",
"physio", "piaget", "pics", "pictet", "pictures", "pink", "pizza", "place",
"play", "plumbing", "plus", "pohl", "poker", "porn", "post", "praxi",
"press", "pro", "prod", "productions", "prof", "properties", "property",
"pub", "qpon", "quebec", "racing", "realtor", "realty", "recipes", "red",
"redstone", "rehab", "reise", "reisen", "reit", "ren", "rent", "rentals",
"repair", "report", "republican", "rest", "restaurant", "review",
"reviews", "rich", "ricoh", "rio", "rip", "rocks", "rodeo", "rsvp", "ruhr",
"run", "ryukyu", "saarland", "sakura", "sale", "samsung", "sandvik",
"sandvikcoromant", "sanofi", "sap", "sarl", "saxo", "sca", "scb",
"schmidt", "scholarships", "school", "schule", "schwarz", "science",
"scor", "scot", "seat", "seek", "sener", "services", "sew", "sex", "sexy",
"shiksha", "shoes", "show", "shriram", "singles", "site", "ski", "sky",
"skype", "sncf", "soccer", "social", "software", "sohu", "solar",
"solutions", "sony", "soy", "space", "spiegel", "spreadbetting", "srl",
"starhub", "statoil", "studio", "study", "style", "sucks", "supplies",
"supply", "support", "surf", "surgery", "suzuki", "swatch", "swiss",
"sydney", "systems", "taipei", "tatamotors", "tatar", "tattoo", "tax",
"taxi", "team", "tech", "technology", "tel", "telefonica", "temasek",
"tennis", "thd", "theater", "tickets", "tienda", "tips", "tires", "tirol",
"today", "tokyo", "tools", "top", "toray", "toshiba", "tours", "town",
"toyota", "toys", "trade", "trading", "training", "travel", "trust", "tui",
"ubs", "university", "uno", "uol", "vacations", "vegas", "ventures",
"vermögensberater", "vermögensberatung", "versicherung", "vet", "viajes",
"video", "villas", "vin", "vision", "vista", "vistaprint", "vlaanderen",
"vodka", "vote", "voting", "voto", "voyage", "wales", "walter", "wang",
"watch", "webcam", "website", "wed", "wedding", "weir", "whoswho", "wien",
"wiki", "williamhill", "win", "windows", "wine", "wme", "work", "works",
"world", "wtc", "wtf", "xbox", "xerox", "xin", "xperia", "xxx", "xyz",
"yachts", "yandex", "yodobashi", "yoga", "yokohama", "youtube", "zip",
"zone", "zuerich", "дети", "ком", "москва", "онлайн", "орг", "рус", "сайт",
"קום", "بازار", "شبكة", "كوم", "موقع", "कॉम", "नेट", "संगठन", "คอม",
"みんな", "グーグル", "コム", "世界", "中信", "中文网", "企业", "佛山", "信息",
"健康", "八卦", "公司", "公益", "商城", "商店", "商标", "在线", "大拿", "娱乐",
"工行", "广东", "慈善", "我爱你", "手机", "政务", "政府", "新闻", "时尚", "机构",
"淡马锡", "游戏", "点看", "移动", "组织机构", "网址", "网店", "网络", "谷歌", "集团",
"飞利浦", "餐厅", "닷넷", "닷컴", "삼성", "onion"]
URL_REGEXP = re.compile(r'(?i)((?:https?://|www\\.)*(?:[\w+-_]+[.])(?:' + r'\b|'.join(TLDS) + r'\b|(?:[0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5]))+(?:[:\w+\/]?[a-z0-9!\*\'\(\);:&=\+\$/%#\[\]\-_\.,~?])*)', re.UNICODE)
def calc_expected_status_length(status, short_url_length=23):
""" Calculates the length of a tweet, taking into account Twitter's
replacement of URLs with https://t.co links.
Args:
status: text of the status message to be posted.
short_url_length: the current published https://t.co links
Returns:
Expected length of the status message as an integer.
"""
replaced_chars = 0
status_length = len(status)
match = re.findall(URL_REGEXP, status)
if len(match) >= 1:
replaced_chars = len(''.join(match))
status_length = status_length - replaced_chars + (short_url_length * len(match))
return status_length
def is_url(text):
""" Checks to see if a bit of text is a URL.
Args:
text: text to check.
Returns:
Boolean of whether the text should be treated as a URL or not.
"""
if re.findall(URL_REGEXP, text):
return True
else:
return False
def http_to_file(http):
data_file = NamedTemporaryFile()
req = requests.get(http, stream=True)
data_file.write(req.raw.data)
return data_file
def parse_media_file(passed_media):
""" Parses a media file and attempts to return a file-like object and
information about the media file.
Args:
passed_media: media file which to parse.
Returns:
file-like object, the filename of the media file, the file size, and
the type of media.
"""
img_formats = ['image/jpeg',
'image/png',
'image/gif',
'image/bmp',
'image/webp']
video_formats = ['video/mp4']
# If passed_media is a string, check if it points to a URL, otherwise,
# it should point to local file. Create a reference to a file obj for
# each case such that data_file ends up with a read() method.
if not hasattr(passed_media, 'read'):
if passed_media.startswith('http'):
data_file = http_to_file(passed_media)
filename = os.path.basename(passed_media)
else:
data_file = open(os.path.realpath(passed_media), 'rb')
filename = os.path.basename(passed_media)
# Otherwise, if a file object was passed in the first place,
# create the standard reference to media_file (i.e., rename it to fp).
else:
if passed_media.mode != 'rb':
raise TwitterError({'message': 'File mode must be "rb".'})
filename = os.path.basename(passed_media.name)
data_file = passed_media
data_file.seek(0, 2)
file_size = data_file.tell()
try:
data_file.seek(0)
except:
pass
media_type = mimetypes.guess_type(os.path.basename(filename))[0]
if media_type in img_formats and file_size > 5 * 1048576:
raise TwitterError({'message': 'Images must be less than 5MB.'})
elif media_type in video_formats and file_size > 15 * 1048576:
raise TwitterError({'message': 'Videos must be less than 15MB.'})
elif media_type not in img_formats and media_type not in video_formats:
raise TwitterError({'message': 'Media type could not be determined.'})
return data_file, filename, file_size, media_type
def enf_type(field, _type, val):
""" Checks to see if a given val for a field (i.e., the name of the field)
is of the proper _type. If it is not, raises a TwitterError with a brief
explanation.
Args:
field:
Name of the field you are checking.
_type:
Type that the value should be returned as.
val:
Value to convert to _type.
Returns:
val converted to type _type.
"""
try:
return _type(val)
except ValueError:
raise TwitterError({
'message': '"{0}" must be type {1}'.format(field, _type.__name__)
})