Update profilehooks to 1.12.0

* Fixes Tautulli/Tautulli-Issues#275
This commit is contained in:
JonnyWong16 2020-09-06 13:45:01 -07:00
parent 55ffd54e5b
commit 0902a61341
No known key found for this signature in database
GPG key ID: B1F1F9807184697A

View file

@ -7,7 +7,7 @@ coverage reports. There's a third convenient decorator (`timecall`) that
measures the duration of function execution without the extra profiling measures the duration of function execution without the extra profiling
overhead. overhead.
Usage example (Python 2.4 or newer):: Usage example::
from profilehooks import profile, coverage from profilehooks import profile, coverage
@ -16,20 +16,18 @@ Usage example (Python 2.4 or newer)::
if n < 2: return 1 if n < 2: return 1
else: return n * fn(n-1) else: return n * fn(n-1)
print fn(42) print(fn(42))
Usage example (Python 2.3 or older):: Or without imports, with some hack
from profilehooks import profile, coverage $ python -m profilehooks yourmodule
@profile # or @coverage
def fn(n): def fn(n):
if n < 2: return 1 if n < 2: return 1
else: return n * fn(n-1) else: return n * fn(n-1)
# Now wrap that function in a decorator print(fn(42))
fn = profile(fn) # or coverage(fn)
print fn(42)
Reports for all thusly decorated functions will be printed to sys.stdout Reports for all thusly decorated functions will be printed to sys.stdout
on program termination. You can alternatively request for immediate on program termination. You can alternatively request for immediate
@ -42,7 +40,7 @@ instead of a detailed (but costly) profile.
Caveats Caveats
A thread on python-dev convinced me that hotshot produces bogus numbers. A thread on python-dev convinced me that hotshot produces bogus numbers.
See http://mail.python.org/pipermail/python-dev/2005-November/058264.html See https://mail.python.org/pipermail/python-dev/2005-November/058264.html
I don't know what will happen if a decorated function will try to call I don't know what will happen if a decorated function will try to call
another decorated function. All decorators probably need to explicitly another decorated function. All decorators probably need to explicitly
@ -62,7 +60,7 @@ Caveats
executed. For this reason coverage analysis now uses trace.py which is executed. For this reason coverage analysis now uses trace.py which is
slower, but more accurate. slower, but more accurate.
Copyright (c) 2004--2008 Marius Gedminas <marius@pov.lt> Copyright (c) 2004--2020 Marius Gedminas <marius@gedmin.as>
Copyright (c) 2007 Hanno Schlichting Copyright (c) 2007 Hanno Schlichting
Copyright (c) 2008 Florian Schulze Copyright (c) 2008 Florian Schulze
@ -88,24 +86,30 @@ Released under the MIT licence since December 2006:
(Previously it was distributed under the GNU General Public Licence.) (Previously it was distributed under the GNU General Public Licence.)
""" """
# $Id: profilehooks.py 29 2010-08-13 16:29:20Z mg $ from __future__ import print_function
__author__ = "Marius Gedminas (marius@gedmin.as)" __author__ = "Marius Gedminas <marius@gedmin.as>"
__copyright__ = "Copyright 2004-2009 Marius Gedminas" __copyright__ = "Copyright 2004-2020 Marius Gedminas and contributors"
__license__ = "MIT" __license__ = "MIT"
__version__ = "1.4" __version__ = '1.12.0'
__date__ = "2009-03-31" __date__ = "2020-08-20"
import atexit import atexit
import functools
import inspect import inspect
import sys import logging
import os
import re import re
import sys
# For profiling # For profiling
from profile import Profile from profile import Profile
import pstats import pstats
# For timecall
import timeit
# For hotshot profiling (inaccurate!) # For hotshot profiling (inaccurate!)
try: try:
import hotshot import hotshot
@ -115,6 +119,9 @@ except ImportError:
# For trace.py coverage # For trace.py coverage
import trace import trace
import dis
import token
import tokenize
# For hotshot coverage (inaccurate!; uses undocumented APIs; might break) # For hotshot coverage (inaccurate!; uses undocumented APIs; might break)
if hotshot is not None: if hotshot is not None:
@ -127,24 +134,55 @@ try:
except ImportError: except ImportError:
cProfile = None cProfile = None
# For timecall
import time
# registry of available profilers # registry of available profilers
AVAILABLE_PROFILERS = {} AVAILABLE_PROFILERS = {}
__all__ = ['coverage', 'coverage_with_hotshot', 'profile', 'timecall']
# Use tokenize.open() on Python >= 3.2, fall back to open() on Python 2
tokenize_open = getattr(tokenize, 'open', open)
def _unwrap(fn):
# inspect.unwrap() doesn't exist on Python 2
if not hasattr(fn, '__wrapped__'):
return fn
else:
# intentionally using recursion here instead of a while loop to
# make cycles fail with a recursion error instead of looping forever.
return _unwrap(fn.__wrapped__)
def _identify(fn):
fn = _unwrap(fn)
funcname = fn.__name__
filename = fn.__code__.co_filename
lineno = fn.__code__.co_firstlineno
return (funcname, filename, lineno)
def _is_file_like(o):
return hasattr(o, 'write')
def profile(fn=None, skip=0, filename=None, immediate=False, dirs=False, def profile(fn=None, skip=0, filename=None, immediate=False, dirs=False,
sort=None, entries=40, sort=None, entries=40,
profiler=('cProfile', 'profile', 'hotshot')): profiler=('cProfile', 'profile', 'hotshot'),
stdout=True):
"""Mark `fn` for profiling. """Mark `fn` for profiling.
If `skip` is > 0, first `skip` calls to `fn` will not be profiled. If `skip` is > 0, first `skip` calls to `fn` will not be profiled.
If `stdout` is not file-like and truthy, output will be printed to
sys.stdout. If it is a file-like object, output will be printed to it
instead. `stdout` must be writable in text mode (as opposed to binary)
if it is file-like.
If `immediate` is False, profiling results will be printed to If `immediate` is False, profiling results will be printed to
sys.stdout on program termination. Otherwise results will be printed self.stdout on program termination. Otherwise results will be printed
after each call. after each call. (If you don't want this, set stdout=False and specify a
`filename` to store profile data.)
If `dirs` is False only the name of the file will be printed. If `dirs` is False only the name of the file will be printed.
Otherwise the full path is used. Otherwise the full path is used.
@ -170,7 +208,8 @@ def profile(fn=None, skip=0, filename=None, immediate=False, dirs=False,
'profile', 'hotshot'). 'profile', 'hotshot').
If `filename` is specified, the profile stats will be stored in the If `filename` is specified, the profile stats will be stored in the
named file. You can load them pstats.Stats(filename). named file. You can load them with pstats.Stats(filename) or use a
visualization tool like RunSnakeRun.
Usage:: Usage::
@ -192,12 +231,12 @@ def profile(fn=None, skip=0, filename=None, immediate=False, dirs=False,
... ...
""" """
if fn is None: # @profile() syntax -- we are a decorator maker if fn is None: # @profile() syntax -- we are a decorator maker
def decorator(fn): def decorator(fn):
return profile(fn, skip=skip, filename=filename, return profile(fn, skip=skip, filename=filename,
immediate=immediate, dirs=dirs, immediate=immediate, dirs=dirs,
sort=sort, entries=entries, sort=sort, entries=entries,
profiler=profiler) profiler=profiler, stdout=stdout)
return decorator return decorator
# @profile syntax -- we are a decorator. # @profile syntax -- we are a decorator.
if isinstance(profiler, str): if isinstance(profiler, str):
@ -208,20 +247,16 @@ def profile(fn=None, skip=0, filename=None, immediate=False, dirs=False,
break break
else: else:
raise ValueError('only these profilers are available: %s' raise ValueError('only these profilers are available: %s'
% ', '.join(AVAILABLE_PROFILERS)) % ', '.join(sorted(AVAILABLE_PROFILERS)))
fp = profiler_class(fn, skip=skip, filename=filename, fp = profiler_class(fn, skip=skip, filename=filename,
immediate=immediate, dirs=dirs, immediate=immediate, dirs=dirs,
sort=sort, entries=entries) sort=sort, entries=entries, stdout=stdout)
# fp = HotShotFuncProfile(fn, skip=skip, filename=filename, ...)
# or HotShotFuncProfile
# We cannot return fp or fp.__call__ directly as that would break method # We cannot return fp or fp.__call__ directly as that would break method
# definitions, instead we need to return a plain function. # definitions, instead we need to return a plain function.
@functools.wraps(fn)
def new_fn(*args, **kw): def new_fn(*args, **kw):
return fp(*args, **kw) return fp(*args, **kw)
new_fn.__doc__ = fn.__doc__
new_fn.__name__ = fn.__name__
new_fn.__dict__ = fn.__dict__
new_fn.__module__ = fn.__module__
return new_fn return new_fn
@ -244,15 +279,13 @@ def coverage(fn):
... ...
""" """
fp = TraceFuncCoverage(fn) # or HotShotFuncCoverage fp = TraceFuncCoverage(fn) # or HotShotFuncCoverage
# We cannot return fp or fp.__call__ directly as that would break method # We cannot return fp or fp.__call__ directly as that would break method
# definitions, instead we need to return a plain function. # definitions, instead we need to return a plain function.
@functools.wraps(fn)
def new_fn(*args, **kw): def new_fn(*args, **kw):
return fp(*args, **kw) return fp(*args, **kw)
new_fn.__doc__ = fn.__doc__
new_fn.__name__ = fn.__name__
new_fn.__dict__ = fn.__dict__
new_fn.__module__ = fn.__module__
return new_fn return new_fn
@ -268,12 +301,10 @@ def coverage_with_hotshot(fn):
fp = HotShotFuncCoverage(fn) fp = HotShotFuncCoverage(fn)
# We cannot return fp or fp.__call__ directly as that would break method # We cannot return fp or fp.__call__ directly as that would break method
# definitions, instead we need to return a plain function. # definitions, instead we need to return a plain function.
@functools.wraps(fn)
def new_fn(*args, **kw): def new_fn(*args, **kw):
return fp(*args, **kw) return fp(*args, **kw)
new_fn.__doc__ = fn.__doc__
new_fn.__name__ = fn.__name__
new_fn.__dict__ = fn.__dict__
new_fn.__module__ = fn.__module__
return new_fn return new_fn
@ -286,7 +317,7 @@ class FuncProfile(object):
Profile = Profile Profile = Profile
def __init__(self, fn, skip=0, filename=None, immediate=False, dirs=False, def __init__(self, fn, skip=0, filename=None, immediate=False, dirs=False,
sort=None, entries=40): sort=None, entries=40, stdout=True):
"""Creates a profiler for a function. """Creates a profiler for a function.
Every profiler has its own log file (the name of which is derived Every profiler has its own log file (the name of which is derived
@ -298,14 +329,21 @@ class FuncProfile(object):
self.fn = fn self.fn = fn
self.skip = skip self.skip = skip
self.filename = filename self.filename = filename
self.immediate = immediate self._immediate = immediate
self.stdout = stdout
self._stdout_is_fp = self.stdout and _is_file_like(self.stdout)
self.dirs = dirs self.dirs = dirs
self.sort = sort or ('cumulative', 'time', 'calls') self.sort = sort or ('cumulative', 'time', 'calls')
if isinstance(self.sort, str): if isinstance(self.sort, str):
self.sort = (self.sort, ) self.sort = (self.sort, )
self.entries = entries self.entries = entries
self.reset_stats() self.reset_stats()
atexit.register(self.atexit) if not self.immediate:
atexit.register(self.atexit)
@property
def immediate(self):
return self._immediate
def __call__(self, *args, **kw): def __call__(self, *args, **kw):
"""Profile a singe call to the function.""" """Profile a singe call to the function."""
@ -332,40 +370,45 @@ class FuncProfile(object):
def print_stats(self): def print_stats(self):
"""Print profile information to sys.stdout.""" """Print profile information to sys.stdout."""
funcname = self.fn.__name__
filename = self.fn.func_code.co_filename
lineno = self.fn.func_code.co_firstlineno
print
print "*** PROFILER RESULTS ***"
print "%s (%s:%s)" % (funcname, filename, lineno)
print "function called %d times" % self.ncalls,
if self.skipped:
print "(%d calls not profiled)" % self.skipped
else:
print
print
stats = self.stats stats = self.stats
if self.filename: if self.filename:
stats.dump_stats(self.filename) stats.dump_stats(self.filename)
if not self.dirs: if self.stdout:
stats.strip_dirs() funcname, filename, lineno = _identify(self.fn)
stats.sort_stats(*self.sort) print_f = print
stats.print_stats(self.entries) if self._stdout_is_fp:
print_f = functools.partial(print, file=self.stdout)
print_f("")
print_f("*** PROFILER RESULTS ***")
print_f("%s (%s:%s)" % (funcname, filename, lineno))
if self.skipped:
skipped = " (%d calls not profiled)" % self.skipped
else:
skipped = ""
print_f("function called %d times%s" % (self.ncalls, skipped))
print_f("")
if not self.dirs:
stats.strip_dirs()
stats.sort_stats(*self.sort)
stats.print_stats(self.entries)
def reset_stats(self): def reset_stats(self):
"""Reset accumulated profiler statistics.""" """Reset accumulated profiler statistics."""
# send stats printing to specified stdout if it's file-like
stream = self.stdout if self._stdout_is_fp else sys.stdout
# Note: not using self.Profile, since pstats.Stats() fails then # Note: not using self.Profile, since pstats.Stats() fails then
self.stats = pstats.Stats(Profile()) self.stats = pstats.Stats(Profile(), stream=stream)
self.ncalls = 0 self.ncalls = 0
self.skipped = 0 self.skipped = 0
def atexit(self): def atexit(self):
"""Stop profiling and print profile information to sys.stdout. """Stop profiling and print profile information to sys.stdout or self.stdout.
This function is registered as an atexit hook. This function is registered as an atexit hook.
""" """
if not self.immediate: self.print_stats()
self.print_stats()
AVAILABLE_PROFILERS['profile'] = FuncProfile AVAILABLE_PROFILERS['profile'] = FuncProfile
@ -383,13 +426,14 @@ if cProfile is not None:
if hotshot is not None: if hotshot is not None:
class HotShotFuncProfile(object): class HotShotFuncProfile(FuncProfile):
"""Profiler for a function (uses hotshot).""" """Profiler for a function (uses hotshot)."""
# This flag is shared between all instances # This flag is shared between all instances
in_profiler = False in_profiler = False
def __init__(self, fn, skip=0, filename=None): def __init__(self, fn, skip=0, filename=None, immediate=False,
dirs=False, sort=None, entries=40, stdout=True):
"""Creates a profiler for a function. """Creates a profiler for a function.
Every profiler has its own log file (the name of which is derived Every profiler has its own log file (the name of which is derived
@ -401,17 +445,13 @@ if hotshot is not None:
The log file is not removed and remains there to clutter the The log file is not removed and remains there to clutter the
current working directory. current working directory.
""" """
self.fn = fn if filename:
self.filename = filename
if self.filename:
self.logfilename = filename + ".raw" self.logfilename = filename + ".raw"
else: else:
self.logfilename = fn.__name__ + ".prof" self.logfilename = "%s.%d.prof" % (fn.__name__, os.getpid())
self.profiler = hotshot.Profile(self.logfilename) super(HotShotFuncProfile, self).__init__(
self.ncalls = 0 fn, skip=skip, filename=filename, immediate=immediate,
self.skip = skip dirs=dirs, sort=sort, entries=entries, stdout=stdout)
self.skipped = 0
atexit.register(self.atexit)
def __call__(self, *args, **kw): def __call__(self, *args, **kw):
"""Profile a singe call to the function.""" """Profile a singe call to the function."""
@ -423,43 +463,32 @@ if hotshot is not None:
if HotShotFuncProfile.in_profiler: if HotShotFuncProfile.in_profiler:
# handle recursive calls # handle recursive calls
return self.fn(*args, **kw) return self.fn(*args, **kw)
if self.profiler is None:
self.profiler = hotshot.Profile(self.logfilename)
try: try:
HotShotFuncProfile.in_profiler = True HotShotFuncProfile.in_profiler = True
return self.profiler.runcall(self.fn, *args, **kw) return self.profiler.runcall(self.fn, *args, **kw)
finally: finally:
HotShotFuncProfile.in_profiler = False HotShotFuncProfile.in_profiler = False
if self.immediate:
self.print_stats()
self.reset_stats()
def atexit(self): def print_stats(self):
"""Stop profiling and print profile information to sys.stderr. if self.profiler is None:
self.stats = pstats.Stats(Profile())
This function is registered as an atexit hook.
"""
self.profiler.close()
funcname = self.fn.__name__
filename = self.fn.func_code.co_filename
lineno = self.fn.func_code.co_firstlineno
print
print "*** PROFILER RESULTS ***"
print "%s (%s:%s)" % (funcname, filename, lineno)
print "function called %d times" % self.ncalls,
if self.skipped:
print "(%d calls not profiled)" % self.skipped
else: else:
print self.profiler.close()
print self.stats = hotshot.stats.load(self.logfilename)
stats = hotshot.stats.load(self.logfilename) super(HotShotFuncProfile, self).print_stats()
# hotshot.stats.load takes ages, and the .prof file eats megabytes, but
# a saved stats object is small and fast def reset_stats(self):
if self.filename: self.profiler = None
stats.dump_stats(self.filename) self.ncalls = 0
# it is best to save before strip_dirs self.skipped = 0
stats.strip_dirs()
stats.sort_stats('cumulative', 'time', 'calls')
stats.print_stats(40)
AVAILABLE_PROFILERS['hotshot'] = HotShotFuncProfile AVAILABLE_PROFILERS['hotshot'] = HotShotFuncProfile
class HotShotFuncCoverage: class HotShotFuncCoverage:
"""Coverage analysis for a function (uses _hotshot). """Coverage analysis for a function (uses _hotshot).
@ -482,7 +511,7 @@ if hotshot is not None:
current working directory. current working directory.
""" """
self.fn = fn self.fn = fn
self.logfilename = fn.__name__ + ".cprof" self.logfilename = "%s.%d.cprof" % (fn.__name__, os.getpid())
self.profiler = _hotshot.coverage(self.logfilename) self.profiler = _hotshot.coverage(self.logfilename)
self.ncalls = 0 self.ncalls = 0
atexit.register(self.atexit) atexit.register(self.atexit)
@ -490,7 +519,11 @@ if hotshot is not None:
def __call__(self, *args, **kw): def __call__(self, *args, **kw):
"""Profile a singe call to the function.""" """Profile a singe call to the function."""
self.ncalls += 1 self.ncalls += 1
return self.profiler.runcall(self.fn, args, kw) old_trace = sys.gettrace()
try:
return self.profiler.runcall(self.fn, args, kw)
finally: # pragma: nocover
sys.settrace(old_trace)
def atexit(self): def atexit(self):
"""Stop profiling and print profile information to sys.stderr. """Stop profiling and print profile information to sys.stderr.
@ -498,14 +531,12 @@ if hotshot is not None:
This function is registered as an atexit hook. This function is registered as an atexit hook.
""" """
self.profiler.close() self.profiler.close()
funcname = self.fn.__name__ funcname, filename, lineno = _identify(self.fn)
filename = self.fn.func_code.co_filename print("")
lineno = self.fn.func_code.co_firstlineno print("*** COVERAGE RESULTS ***")
print print("%s (%s:%s)" % (funcname, filename, lineno))
print "*** COVERAGE RESULTS ***" print("function called %d times" % self.ncalls)
print "%s (%s:%s)" % (funcname, filename, lineno) print("")
print "function called %d times" % self.ncalls
print
fs = FuncSource(self.fn) fs = FuncSource(self.fn)
reader = hotshot.log.LogReader(self.logfilename) reader = hotshot.log.LogReader(self.logfilename)
for what, (filename, lineno, funcname), tdelta in reader: for what, (filename, lineno, funcname), tdelta in reader:
@ -514,15 +545,19 @@ if hotshot is not None:
if what == hotshot.log.LINE: if what == hotshot.log.LINE:
fs.mark(lineno) fs.mark(lineno)
if what == hotshot.log.ENTER: if what == hotshot.log.ENTER:
# hotshot gives us the line number of the function definition # hotshot gives us the line number of the function
# and never gives us a LINE event for the first statement in # definition and never gives us a LINE event for the first
# a function, so if we didn't perform this mapping, the first # statement in a function, so if we didn't perform this
# statement would be marked as never executed # mapping, the first statement would be marked as never
# executed
if lineno == fs.firstlineno: if lineno == fs.firstlineno:
lineno = fs.firstcodelineno lineno = fs.firstcodelineno
fs.mark(lineno) fs.mark(lineno)
reader.close() reader.close()
print fs print(fs)
never_executed = fs.count_never_executed()
if never_executed:
print("%d lines were not executed." % never_executed)
class TraceFuncCoverage: class TraceFuncCoverage:
@ -552,19 +587,21 @@ class TraceFuncCoverage:
current working directory. current working directory.
""" """
self.fn = fn self.fn = fn
self.logfilename = fn.__name__ + ".cprof" self.logfilename = "%s.%d.cprof" % (fn.__name__, os.getpid())
self.ncalls = 0 self.ncalls = 0
atexit.register(self.atexit) atexit.register(self.atexit)
def __call__(self, *args, **kw): def __call__(self, *args, **kw):
"""Profile a singe call to the function.""" """Profile a singe call to the function."""
self.ncalls += 1 self.ncalls += 1
if TraceFuncCoverage.tracing: if TraceFuncCoverage.tracing: # pragma: nocover
return self.fn(*args, **kw) return self.fn(*args, **kw)
old_trace = sys.gettrace()
try: try:
TraceFuncCoverage.tracing = True TraceFuncCoverage.tracing = True
return self.tracer.runfunc(self.fn, *args, **kw) return self.tracer.runfunc(self.fn, *args, **kw)
finally: finally: # pragma: nocover
sys.settrace(old_trace)
TraceFuncCoverage.tracing = False TraceFuncCoverage.tracing = False
def atexit(self): def atexit(self):
@ -572,23 +609,21 @@ class TraceFuncCoverage:
This function is registered as an atexit hook. This function is registered as an atexit hook.
""" """
funcname = self.fn.__name__ funcname, filename, lineno = _identify(self.fn)
filename = self.fn.func_code.co_filename print("")
lineno = self.fn.func_code.co_firstlineno print("*** COVERAGE RESULTS ***")
print print("%s (%s:%s)" % (funcname, filename, lineno))
print "*** COVERAGE RESULTS ***" print("function called %d times" % self.ncalls)
print "%s (%s:%s)" % (funcname, filename, lineno) print("")
print "function called %d times" % self.ncalls
print
fs = FuncSource(self.fn) fs = FuncSource(self.fn)
for (filename, lineno), count in self.tracer.counts.items(): for (filename, lineno), count in self.tracer.counts.items():
if filename != fs.filename: if filename != fs.filename:
continue continue
fs.mark(lineno, count) fs.mark(lineno, count)
print fs print(fs)
never_executed = fs.count_never_executed() never_executed = fs.count_never_executed()
if never_executed: if never_executed:
print "%d lines were not executed." % never_executed print("%d lines were not executed." % never_executed)
class FuncSource: class FuncSource:
@ -599,22 +634,47 @@ class FuncSource:
def __init__(self, fn): def __init__(self, fn):
self.fn = fn self.fn = fn
self.filename = inspect.getsourcefile(fn) self.filename = inspect.getsourcefile(fn)
self.source, self.firstlineno = inspect.getsourcelines(fn)
self.sourcelines = {} self.sourcelines = {}
self.firstcodelineno = self.firstlineno self.source = []
self.find_source_lines() self.firstlineno = self.firstcodelineno = 0
try:
self.source, self.firstlineno = inspect.getsourcelines(fn)
self.firstcodelineno = self.firstlineno
self.find_source_lines()
except IOError:
self.filename = None
def find_source_lines(self): def find_source_lines(self):
"""Mark all executable source lines in fn as executed 0 times.""" """Mark all executable source lines in fn as executed 0 times."""
strs = trace.find_strings(self.filename) if self.filename is None:
lines = trace.find_lines_from_code(self.fn.func_code, strs) return
self.firstcodelineno = sys.maxint strs = self._find_docstrings(self.filename)
lines = {
ln
for off, ln in dis.findlinestarts(_unwrap(self.fn).__code__)
if ln not in strs
}
for lineno in lines: for lineno in lines:
self.firstcodelineno = min(self.firstcodelineno, lineno)
self.sourcelines.setdefault(lineno, 0) self.sourcelines.setdefault(lineno, 0)
if self.firstcodelineno == sys.maxint: if lines:
self.firstcodelineno = min(lines)
else: # pragma: nocover
# This branch cannot be reached, I'm just being paranoid.
self.firstcodelineno = self.firstlineno self.firstcodelineno = self.firstlineno
def _find_docstrings(self, filename):
# A replacement for trace.find_strings() which was deprecated in
# Python 3.2 and removed in 3.6.
strs = set()
prev = token.INDENT # so module docstring is detected as docstring
with tokenize_open(filename) as f:
tokens = tokenize.generate_tokens(f.readline)
for ttype, tstr, start, end, line in tokens:
if ttype == token.STRING and prev == token.INDENT:
strs.update(range(start[0], end[0] + 1))
prev = ttype
return strs
def mark(self, lineno, count=1): def mark(self, lineno, count=1):
"""Mark a given source line as executed count times. """Mark a given source line as executed count times.
@ -635,6 +695,8 @@ class FuncSource:
def __str__(self): def __str__(self):
"""Return annotated source code for the function.""" """Return annotated source code for the function."""
if self.filename is None:
return "cannot show coverage data since co_filename is None"
lines = [] lines = []
lineno = self.firstlineno lineno = self.firstlineno
for line in self.source: for line in self.source:
@ -642,7 +704,10 @@ class FuncSource:
if counter is None: if counter is None:
prefix = ' ' * 7 prefix = ' ' * 7
elif counter == 0: elif counter == 0:
if self.blank_rx.match(line): if self.blank_rx.match(line): # pragma: nocover
# This is an workaround for an ancient bug I can't
# reproduce, perhaps because it was fixed, or perhaps
# because I can't remember all the details.
prefix = ' ' * 7 prefix = ' ' * 7
else: else:
prefix = '>' * 6 + ' ' prefix = '>' * 6 + ' '
@ -653,7 +718,10 @@ class FuncSource:
return ''.join(lines) return ''.join(lines)
def timecall(fn=None, immediate=True, timer=time.time): def timecall(
fn=None, immediate=True, timer=None,
log_name=None, log_level=logging.DEBUG,
):
"""Wrap `fn` and print its execution time. """Wrap `fn` and print its execution time.
Example:: Example::
@ -665,36 +733,56 @@ def timecall(fn=None, immediate=True, timer=time.time):
somefunc(2, 3) somefunc(2, 3)
will print the time taken by somefunc on every call. If you want just will print the time taken by somefunc on every call. If you want just
a summary at program termination, use a summary at program termination, use ::
@timecall(immediate=False) @timecall(immediate=False)
You can also choose a timing method other than the default ``time.time()``, You can also choose a timing method other than the default
e.g.: ``timeit.default_timer()``, e.g.::
@timecall(timer=time.clock) @timecall(timer=time.clock)
You can also log the output to a logger by specifying the name and level
of the logger to use, eg:
@timecall(immediate=True,
log_name='profile_log',
log_level=logging.DEBUG)
""" """
if fn is None: # @timecall() syntax -- we are a decorator maker if fn is None: # @timecall() syntax -- we are a decorator maker
def decorator(fn): def decorator(fn):
return timecall(fn, immediate=immediate, timer=timer) return timecall(
fn, immediate=immediate, timer=timer,
log_name=log_name, log_level=log_level,
)
return decorator return decorator
# @timecall syntax -- we are a decorator. # @timecall syntax -- we are a decorator.
fp = FuncTimer(fn, immediate=immediate, timer=timer) if timer is None:
timer = timeit.default_timer
fp = FuncTimer(
fn, immediate=immediate, timer=timer,
log_name=log_name, log_level=log_level,
)
# We cannot return fp or fp.__call__ directly as that would break method # We cannot return fp or fp.__call__ directly as that would break method
# definitions, instead we need to return a plain function. # definitions, instead we need to return a plain function.
@functools.wraps(fn)
def new_fn(*args, **kw): def new_fn(*args, **kw):
return fp(*args, **kw) return fp(*args, **kw)
new_fn.__doc__ = fn.__doc__
new_fn.__name__ = fn.__name__
new_fn.__dict__ = fn.__dict__
new_fn.__module__ = fn.__module__
return new_fn return new_fn
class FuncTimer(object): class FuncTimer(object):
def __init__(self, fn, immediate, timer): def __init__(
self, fn, immediate, timer,
log_name=None, log_level=logging.DEBUG,
):
self.logger = None
if log_name:
self.logger = logging.getLogger(log_name)
self.log_level = log_level
self.fn = fn self.fn = fn
self.ncalls = 0 self.ncalls = 0
self.totaltime = 0 self.totaltime = 0
@ -708,25 +796,57 @@ class FuncTimer(object):
fn = self.fn fn = self.fn
timer = self.timer timer = self.timer
self.ncalls += 1 self.ncalls += 1
start = timer()
try: try:
start = timer()
return fn(*args, **kw) return fn(*args, **kw)
finally: finally:
duration = timer() - start duration = timer() - start
self.totaltime += duration self.totaltime += duration
if self.immediate: if self.immediate:
funcname = fn.__name__ funcname, filename, lineno = _identify(fn)
filename = fn.func_code.co_filename message = "%s (%s:%s):\n %.3f seconds\n\n" % (
lineno = fn.func_code.co_firstlineno funcname, filename, lineno, duration,
print >> sys.stderr, "\n %s (%s:%s):\n %.3f seconds\n" % ( )
funcname, filename, lineno, duration) if self.logger:
self.logger.log(self.log_level, message)
else:
sys.stderr.write("\n " + message)
sys.stderr.flush()
def atexit(self): def atexit(self):
if not self.ncalls: if not self.ncalls:
return return
funcname = self.fn.__name__ funcname, filename, lineno = _identify(self.fn)
filename = self.fn.func_code.co_filename message = "\n %s (%s:%s):\n"\
lineno = self.fn.func_code.co_firstlineno " %d calls, %.3f seconds (%.3f seconds per call)\n" % (
print ("\n %s (%s:%s):\n" funcname, filename, lineno, self.ncalls,
" %d calls, %.3f seconds (%.3f seconds per call)\n" % ( self.totaltime, self.totaltime / self.ncalls)
funcname, filename, lineno, self.ncalls, if self.logger:
self.totaltime, self.totaltime / self.ncalls)) self.logger.log(self.log_level, message)
else:
print(message)
if __name__ == '__main__':
local = dict((name, globals()[name]) for name in __all__)
message = """********
Injected `profilehooks`
--------
{}
********
""".format("\n".join(local.keys()))
def interact_():
from code import interact
interact(message, local=local)
def run_():
from runpy import run_module
print(message)
run_module(sys.argv[1], init_globals=local)
if len(sys.argv) == 1:
interact_()
else:
run_()