Update vendored windows libs

This commit is contained in:
Labrys of Knossos 2022-11-28 05:59:32 -05:00
commit b1cefa94e5
226 changed files with 33472 additions and 11882 deletions

View file

@ -1,7 +1,3 @@
#!/usr/bin/env python
from __future__ import print_function
import os
import sys
import operator
@ -9,14 +5,17 @@ import collections
import functools
import stat
from ctypes import (
POINTER, byref, cast, create_unicode_buffer,
create_string_buffer, windll)
POINTER,
byref,
cast,
create_unicode_buffer,
create_string_buffer,
windll,
)
from ctypes.wintypes import LPWSTR
import nt
import posixpath
import six
from six.moves import builtins, filter, map
import builtins
from jaraco.structures import binary
@ -26,476 +25,473 @@ from jaraco.windows import reparse
def mklink():
"""
Like cmd.exe's mklink except it will infer directory status of the
target.
"""
from optparse import OptionParser
parser = OptionParser(usage="usage: %prog [options] link target")
parser.add_option(
'-d', '--directory',
help="Target is a directory (only necessary if not present)",
action="store_true")
options, args = parser.parse_args()
try:
link, target = args
except ValueError:
parser.error("incorrect number of arguments")
symlink(target, link, options.directory)
sys.stdout.write("Symbolic link created: %(link)s --> %(target)s\n" % vars())
"""
Like cmd.exe's mklink except it will infer directory status of the
target.
"""
from optparse import OptionParser
parser = OptionParser(usage="usage: %prog [options] link target")
parser.add_option(
'-d',
'--directory',
help="Target is a directory (only necessary if not present)",
action="store_true",
)
options, args = parser.parse_args()
try:
link, target = args
except ValueError:
parser.error("incorrect number of arguments")
symlink(target, link, options.directory)
sys.stdout.write("Symbolic link created: %(link)s --> %(target)s\n" % vars())
def _is_target_a_directory(link, rel_target):
"""
If creating a symlink from link to a target, determine if target
is a directory (relative to dirname(link)).
"""
target = os.path.join(os.path.dirname(link), rel_target)
return os.path.isdir(target)
"""
If creating a symlink from link to a target, determine if target
is a directory (relative to dirname(link)).
"""
target = os.path.join(os.path.dirname(link), rel_target)
return os.path.isdir(target)
def symlink(target, link, target_is_directory=False):
"""
An implementation of os.symlink for Windows (Vista and greater)
"""
target_is_directory = (
target_is_directory or
_is_target_a_directory(link, target)
)
# normalize the target (MS symlinks don't respect forward slashes)
target = os.path.normpath(target)
handle_nonzero_success(
api.CreateSymbolicLink(link, target, target_is_directory))
"""
An implementation of os.symlink for Windows (Vista and greater)
"""
target_is_directory = target_is_directory or _is_target_a_directory(link, target)
# normalize the target (MS symlinks don't respect forward slashes)
target = os.path.normpath(target)
flags = target_is_directory | api.SYMBOLIC_LINK_FLAG_ALLOW_UNPRIVILEGED_CREATE
handle_nonzero_success(api.CreateSymbolicLink(link, target, flags))
def link(target, link):
"""
Establishes a hard link between an existing file and a new file.
"""
handle_nonzero_success(api.CreateHardLink(link, target, None))
"""
Establishes a hard link between an existing file and a new file.
"""
handle_nonzero_success(api.CreateHardLink(link, target, None))
def is_reparse_point(path):
"""
Determine if the given path is a reparse point.
Return False if the file does not exist or the file attributes cannot
be determined.
"""
res = api.GetFileAttributes(path)
return (
res != api.INVALID_FILE_ATTRIBUTES
and bool(res & api.FILE_ATTRIBUTE_REPARSE_POINT)
)
"""
Determine if the given path is a reparse point.
Return False if the file does not exist or the file attributes cannot
be determined.
"""
res = api.GetFileAttributes(path)
return res != api.INVALID_FILE_ATTRIBUTES and bool(
res & api.FILE_ATTRIBUTE_REPARSE_POINT
)
def islink(path):
"Determine if the given path is a symlink"
return is_reparse_point(path) and is_symlink(path)
"Determine if the given path is a symlink"
return is_reparse_point(path) and is_symlink(path)
def _patch_path(path):
"""
Paths have a max length of api.MAX_PATH characters (260). If a target path
is longer than that, it needs to be made absolute and prepended with
\\?\ in order to work with API calls.
See http://msdn.microsoft.com/en-us/library/aa365247%28v=vs.85%29.aspx for
details.
"""
if path.startswith('\\\\?\\'):
return path
abs_path = os.path.abspath(path)
if not abs_path[1] == ':':
# python doesn't include the drive letter, but \\?\ requires it
abs_path = os.getcwd()[:2] + abs_path
return '\\\\?\\' + abs_path
r"""
Paths have a max length of api.MAX_PATH characters (260). If a target path
is longer than that, it needs to be made absolute and prepended with
\\?\ in order to work with API calls.
See http://msdn.microsoft.com/en-us/library/aa365247%28v=vs.85%29.aspx for
details.
"""
if path.startswith('\\\\?\\'):
return path
abs_path = os.path.abspath(path)
if not abs_path[1] == ':':
# python doesn't include the drive letter, but \\?\ requires it
abs_path = os.getcwd()[:2] + abs_path
return '\\\\?\\' + abs_path
def is_symlink(path):
"""
Assuming path is a reparse point, determine if it's a symlink.
"""
path = _patch_path(path)
try:
return _is_symlink(next(find_files(path)))
except WindowsError as orig_error:
tmpl = "Error accessing {path}: {orig_error.message}"
raise builtins.WindowsError(tmpl.format(**locals()))
"""
Assuming path is a reparse point, determine if it's a symlink.
"""
path = _patch_path(path)
try:
return _is_symlink(next(find_files(path)))
# comment below workaround for PyCQA/pyflakes#376
except WindowsError as orig_error: # noqa: F841
tmpl = "Error accessing {path}: {orig_error.message}"
raise builtins.WindowsError(tmpl.format(**locals()))
def _is_symlink(find_data):
return find_data.reserved[0] == api.IO_REPARSE_TAG_SYMLINK
return find_data.reserved[0] == api.IO_REPARSE_TAG_SYMLINK
def find_files(spec):
"""
A pythonic wrapper around the FindFirstFile/FindNextFile win32 api.
r"""
A pythonic wrapper around the FindFirstFile/FindNextFile win32 api.
>>> root_files = tuple(find_files(r'c:\*'))
>>> len(root_files) > 1
True
>>> root_files[0].filename == root_files[1].filename
False
>>> root_files = tuple(find_files(r'c:\*'))
>>> len(root_files) > 1
True
>>> root_files[0].filename == root_files[1].filename
False
This test might fail on a non-standard installation
>>> 'Windows' in (fd.filename for fd in root_files)
True
"""
fd = api.WIN32_FIND_DATA()
handle = api.FindFirstFile(spec, byref(fd))
while True:
if handle == api.INVALID_HANDLE_VALUE:
raise WindowsError()
yield fd
fd = api.WIN32_FIND_DATA()
res = api.FindNextFile(handle, byref(fd))
if res == 0: # error
error = WindowsError()
if error.code == api.ERROR_NO_MORE_FILES:
break
else:
raise error
# todo: how to close handle when generator is destroyed?
# hint: catch GeneratorExit
windll.kernel32.FindClose(handle)
This test might fail on a non-standard installation
>>> 'Windows' in (fd.filename for fd in root_files)
True
"""
fd = api.WIN32_FIND_DATA()
handle = api.FindFirstFile(spec, byref(fd))
while True:
if handle == api.INVALID_HANDLE_VALUE:
raise WindowsError()
yield fd
fd = api.WIN32_FIND_DATA()
res = api.FindNextFile(handle, byref(fd))
if res == 0: # error
error = WindowsError()
if error.code == api.ERROR_NO_MORE_FILES:
break
else:
raise error
# todo: how to close handle when generator is destroyed?
# hint: catch GeneratorExit
windll.kernel32.FindClose(handle)
def get_final_path(path):
"""
For a given path, determine the ultimate location of that path.
Useful for resolving symlink targets.
This functions wraps the GetFinalPathNameByHandle from the Windows
SDK.
r"""
For a given path, determine the ultimate location of that path.
Useful for resolving symlink targets.
This functions wraps the GetFinalPathNameByHandle from the Windows
SDK.
Note, this function fails if a handle cannot be obtained (such as
for C:\Pagefile.sys on a stock windows system). Consider using
trace_symlink_target instead.
"""
desired_access = api.NULL
share_mode = (
api.FILE_SHARE_READ | api.FILE_SHARE_WRITE | api.FILE_SHARE_DELETE
)
security_attributes = api.LPSECURITY_ATTRIBUTES() # NULL pointer
hFile = api.CreateFile(
path,
desired_access,
share_mode,
security_attributes,
api.OPEN_EXISTING,
api.FILE_FLAG_BACKUP_SEMANTICS,
api.NULL,
)
Note, this function fails if a handle cannot be obtained (such as
for C:\Pagefile.sys on a stock windows system). Consider using
trace_symlink_target instead.
"""
desired_access = api.NULL
share_mode = api.FILE_SHARE_READ | api.FILE_SHARE_WRITE | api.FILE_SHARE_DELETE
security_attributes = api.LPSECURITY_ATTRIBUTES() # NULL pointer
hFile = api.CreateFile(
path,
desired_access,
share_mode,
security_attributes,
api.OPEN_EXISTING,
api.FILE_FLAG_BACKUP_SEMANTICS,
api.NULL,
)
if hFile == api.INVALID_HANDLE_VALUE:
raise WindowsError()
if hFile == api.INVALID_HANDLE_VALUE:
raise WindowsError()
buf_size = api.GetFinalPathNameByHandle(
hFile, LPWSTR(), 0, api.VOLUME_NAME_DOS)
handle_nonzero_success(buf_size)
buf = create_unicode_buffer(buf_size)
result_length = api.GetFinalPathNameByHandle(
hFile, buf, len(buf), api.VOLUME_NAME_DOS)
buf_size = api.GetFinalPathNameByHandle(hFile, LPWSTR(), 0, api.VOLUME_NAME_DOS)
handle_nonzero_success(buf_size)
buf = create_unicode_buffer(buf_size)
result_length = api.GetFinalPathNameByHandle(
hFile, buf, len(buf), api.VOLUME_NAME_DOS
)
assert result_length < len(buf)
handle_nonzero_success(result_length)
handle_nonzero_success(api.CloseHandle(hFile))
assert result_length < len(buf)
handle_nonzero_success(result_length)
handle_nonzero_success(api.CloseHandle(hFile))
return buf[:result_length]
return buf[:result_length]
def compat_stat(path):
"""
Generate stat as found on Python 3.2 and later.
"""
stat = os.stat(path)
info = get_file_info(path)
# rewrite st_ino, st_dev, and st_nlink based on file info
return nt.stat_result(
(stat.st_mode,) +
(info.file_index, info.volume_serial_number, info.number_of_links) +
stat[4:]
)
"""
Generate stat as found on Python 3.2 and later.
"""
stat = os.stat(path)
info = get_file_info(path)
# rewrite st_ino, st_dev, and st_nlink based on file info
return nt.stat_result(
(stat.st_mode,)
+ (info.file_index, info.volume_serial_number, info.number_of_links)
+ stat[4:]
)
def samefile(f1, f2):
"""
Backport of samefile from Python 3.2 with support for Windows.
"""
return posixpath.samestat(compat_stat(f1), compat_stat(f2))
"""
Backport of samefile from Python 3.2 with support for Windows.
"""
return posixpath.samestat(compat_stat(f1), compat_stat(f2))
def get_file_info(path):
# open the file the same way CPython does in posixmodule.c
desired_access = api.FILE_READ_ATTRIBUTES
share_mode = 0
security_attributes = None
creation_disposition = api.OPEN_EXISTING
flags_and_attributes = (
api.FILE_ATTRIBUTE_NORMAL |
api.FILE_FLAG_BACKUP_SEMANTICS |
api.FILE_FLAG_OPEN_REPARSE_POINT
)
template_file = None
# open the file the same way CPython does in posixmodule.c
desired_access = api.FILE_READ_ATTRIBUTES
share_mode = 0
security_attributes = None
creation_disposition = api.OPEN_EXISTING
flags_and_attributes = (
api.FILE_ATTRIBUTE_NORMAL
| api.FILE_FLAG_BACKUP_SEMANTICS
| api.FILE_FLAG_OPEN_REPARSE_POINT
)
template_file = None
handle = api.CreateFile(
path,
desired_access,
share_mode,
security_attributes,
creation_disposition,
flags_and_attributes,
template_file,
)
handle = api.CreateFile(
path,
desired_access,
share_mode,
security_attributes,
creation_disposition,
flags_and_attributes,
template_file,
)
if handle == api.INVALID_HANDLE_VALUE:
raise WindowsError()
if handle == api.INVALID_HANDLE_VALUE:
raise WindowsError()
info = api.BY_HANDLE_FILE_INFORMATION()
res = api.GetFileInformationByHandle(handle, info)
handle_nonzero_success(res)
handle_nonzero_success(api.CloseHandle(handle))
info = api.BY_HANDLE_FILE_INFORMATION()
res = api.GetFileInformationByHandle(handle, info)
handle_nonzero_success(res)
handle_nonzero_success(api.CloseHandle(handle))
return info
return info
def GetBinaryType(filepath):
res = api.DWORD()
handle_nonzero_success(api._GetBinaryType(filepath, res))
return res
res = api.DWORD()
handle_nonzero_success(api._GetBinaryType(filepath, res))
return res
def _make_null_terminated_list(obs):
obs = _makelist(obs)
if obs is None:
return
return u'\x00'.join(obs) + u'\x00\x00'
obs = _makelist(obs)
if obs is None:
return
return u'\x00'.join(obs) + u'\x00\x00'
def _makelist(ob):
if ob is None:
return
if not isinstance(ob, (list, tuple, set)):
return [ob]
return ob
if ob is None:
return
if not isinstance(ob, (list, tuple, set)):
return [ob]
return ob
def SHFileOperation(operation, from_, to=None, flags=[]):
flags = functools.reduce(operator.or_, flags, 0)
from_ = _make_null_terminated_list(from_)
to = _make_null_terminated_list(to)
params = api.SHFILEOPSTRUCT(0, operation, from_, to, flags)
res = api._SHFileOperation(params)
if res != 0:
raise RuntimeError("SHFileOperation returned %d" % res)
flags = functools.reduce(operator.or_, flags, 0)
from_ = _make_null_terminated_list(from_)
to = _make_null_terminated_list(to)
params = api.SHFILEOPSTRUCT(0, operation, from_, to, flags)
res = api._SHFileOperation(params)
if res != 0:
raise RuntimeError("SHFileOperation returned %d" % res)
def join(*paths):
r"""
Wrapper around os.path.join that works with Windows drive letters.
r"""
Wrapper around os.path.join that works with Windows drive letters.
>>> join('d:\\foo', '\\bar')
'd:\\bar'
"""
paths_with_drives = map(os.path.splitdrive, paths)
drives, paths = zip(*paths_with_drives)
# the drive we care about is the last one in the list
drive = next(filter(None, reversed(drives)), '')
return os.path.join(drive, os.path.join(*paths))
>>> join('d:\\foo', '\\bar')
'd:\\bar'
"""
paths_with_drives = map(os.path.splitdrive, paths)
drives, paths = zip(*paths_with_drives)
# the drive we care about is the last one in the list
drive = next(filter(None, reversed(drives)), '')
return os.path.join(drive, os.path.join(*paths))
def resolve_path(target, start=os.path.curdir):
r"""
Find a path from start to target where target is relative to start.
r"""
Find a path from start to target where target is relative to start.
>>> tmp = str(getfixture('tmpdir_as_cwd'))
>>> tmp = str(getfixture('tmpdir_as_cwd'))
>>> findpath('d:\\')
'd:\\'
>>> findpath('d:\\')
'd:\\'
>>> findpath('d:\\', tmp)
'd:\\'
>>> findpath('d:\\', tmp)
'd:\\'
>>> findpath('\\bar', 'd:\\')
'd:\\bar'
>>> findpath('\\bar', 'd:\\')
'd:\\bar'
>>> findpath('\\bar', 'd:\\foo') # fails with '\\bar'
'd:\\bar'
>>> findpath('\\bar', 'd:\\foo') # fails with '\\bar'
'd:\\bar'
>>> findpath('bar', 'd:\\foo')
'd:\\foo\\bar'
>>> findpath('bar', 'd:\\foo')
'd:\\foo\\bar'
>>> findpath('\\baz', 'd:\\foo\\bar') # fails with '\\baz'
'd:\\baz'
>>> findpath('\\baz', 'd:\\foo\\bar') # fails with '\\baz'
'd:\\baz'
>>> os.path.abspath(findpath('\\bar')).lower()
'c:\\bar'
>>> os.path.abspath(findpath('\\bar')).lower()
'c:\\bar'
>>> os.path.abspath(findpath('bar'))
'...\\bar'
>>> os.path.abspath(findpath('bar'))
'...\\bar'
>>> findpath('..', 'd:\\foo\\bar')
'd:\\foo'
>>> findpath('..', 'd:\\foo\\bar')
'd:\\foo'
The parent of the root directory is the root directory.
>>> findpath('..', 'd:\\')
'd:\\'
"""
return os.path.normpath(join(start, target))
The parent of the root directory is the root directory.
>>> findpath('..', 'd:\\')
'd:\\'
"""
return os.path.normpath(join(start, target))
findpath = resolve_path
def trace_symlink_target(link):
"""
Given a file that is known to be a symlink, trace it to its ultimate
target.
"""
Given a file that is known to be a symlink, trace it to its ultimate
target.
Raises TargetNotPresent when the target cannot be determined.
Raises ValueError when the specified link is not a symlink.
"""
Raises TargetNotPresent when the target cannot be determined.
Raises ValueError when the specified link is not a symlink.
"""
if not is_symlink(link):
raise ValueError("link must point to a symlink on the system")
while is_symlink(link):
orig = os.path.dirname(link)
link = readlink(link)
link = resolve_path(link, orig)
return link
if not is_symlink(link):
raise ValueError("link must point to a symlink on the system")
while is_symlink(link):
orig = os.path.dirname(link)
link = readlink(link)
link = resolve_path(link, orig)
return link
def readlink(link):
"""
readlink(link) -> target
Return a string representing the path to which the symbolic link points.
"""
handle = api.CreateFile(
link,
0,
0,
None,
api.OPEN_EXISTING,
api.FILE_FLAG_OPEN_REPARSE_POINT | api.FILE_FLAG_BACKUP_SEMANTICS,
None,
)
"""
readlink(link) -> target
Return a string representing the path to which the symbolic link points.
"""
handle = api.CreateFile(
link,
0,
0,
None,
api.OPEN_EXISTING,
api.FILE_FLAG_OPEN_REPARSE_POINT | api.FILE_FLAG_BACKUP_SEMANTICS,
None,
)
if handle == api.INVALID_HANDLE_VALUE:
raise WindowsError()
if handle == api.INVALID_HANDLE_VALUE:
raise WindowsError()
res = reparse.DeviceIoControl(
handle, api.FSCTL_GET_REPARSE_POINT, None, 10240)
res = reparse.DeviceIoControl(handle, api.FSCTL_GET_REPARSE_POINT, None, 10240)
bytes = create_string_buffer(res)
p_rdb = cast(bytes, POINTER(api.REPARSE_DATA_BUFFER))
rdb = p_rdb.contents
if not rdb.tag == api.IO_REPARSE_TAG_SYMLINK:
raise RuntimeError("Expected IO_REPARSE_TAG_SYMLINK, but got %d" % rdb.tag)
bytes = create_string_buffer(res)
p_rdb = cast(bytes, POINTER(api.REPARSE_DATA_BUFFER))
rdb = p_rdb.contents
if not rdb.tag == api.IO_REPARSE_TAG_SYMLINK:
raise RuntimeError("Expected IO_REPARSE_TAG_SYMLINK, but got %d" % rdb.tag)
handle_nonzero_success(api.CloseHandle(handle))
return rdb.get_substitute_name()
handle_nonzero_success(api.CloseHandle(handle))
return rdb.get_substitute_name()
def patch_os_module():
"""
jaraco.windows provides the os.symlink and os.readlink functions.
Monkey-patch the os module to include them if not present.
"""
if not hasattr(os, 'symlink'):
os.symlink = symlink
os.path.islink = islink
if not hasattr(os, 'readlink'):
os.readlink = readlink
"""
jaraco.windows provides the os.symlink and os.readlink functions.
Monkey-patch the os module to include them if not present.
"""
if not hasattr(os, 'symlink'):
os.symlink = symlink
os.path.islink = islink
if not hasattr(os, 'readlink'):
os.readlink = readlink
def find_symlinks(root):
for dirpath, dirnames, filenames in os.walk(root):
for name in dirnames + filenames:
pathname = os.path.join(dirpath, name)
if is_symlink(pathname):
yield pathname
# don't traverse symlinks
if name in dirnames:
dirnames.remove(name)
for dirpath, dirnames, filenames in os.walk(root):
for name in dirnames + filenames:
pathname = os.path.join(dirpath, name)
if is_symlink(pathname):
yield pathname
# don't traverse symlinks
if name in dirnames:
dirnames.remove(name)
def find_symlinks_cmd():
"""
%prog [start-path]
Search the specified path (defaults to the current directory) for symlinks,
printing the source and target on each line.
"""
from optparse import OptionParser
from textwrap import dedent
parser = OptionParser(usage=dedent(find_symlinks_cmd.__doc__).strip())
options, args = parser.parse_args()
if not args:
args = ['.']
root = args.pop()
if args:
parser.error("unexpected argument(s)")
try:
for symlink in find_symlinks(root):
target = readlink(symlink)
dir = ['', 'D'][os.path.isdir(symlink)]
msg = '{dir:2}{symlink} --> {target}'.format(**locals())
print(msg)
except KeyboardInterrupt:
pass
"""
%prog [start-path]
Search the specified path (defaults to the current directory) for symlinks,
printing the source and target on each line.
"""
from optparse import OptionParser
from textwrap import dedent
parser = OptionParser(usage=dedent(find_symlinks_cmd.__doc__).strip())
options, args = parser.parse_args()
if not args:
args = ['.']
root = args.pop()
if args:
parser.error("unexpected argument(s)")
try:
for symlink in find_symlinks(root):
target = readlink(symlink)
dir = ['', 'D'][os.path.isdir(symlink)]
msg = '{dir:2}{symlink} --> {target}'.format(**locals())
print(msg)
except KeyboardInterrupt:
pass
@six.add_metaclass(binary.BitMask)
class FileAttributes(int):
class FileAttributes(int, metaclass=binary.BitMask):
# extract the values from the stat module on Python 3.5
# and later.
locals().update(
(name.split('FILE_ATTRIBUTES_')[1].lower(), value)
for name, value in vars(stat).items()
if name.startswith('FILE_ATTRIBUTES_')
)
# extract the values from the stat module on Python 3.5
# and later.
locals().update(
(name.split('FILE_ATTRIBUTES_')[1].lower(), value)
for name, value in vars(stat).items()
if name.startswith('FILE_ATTRIBUTES_')
)
# For Python 3.4 and earlier, define the constants here
archive = 0x20
compressed = 0x800
hidden = 0x2
device = 0x40
directory = 0x10
encrypted = 0x4000
normal = 0x80
not_content_indexed = 0x2000
offline = 0x1000
read_only = 0x1
reparse_point = 0x400
sparse_file = 0x200
system = 0x4
temporary = 0x100
virtual = 0x10000
# For Python 3.4 and earlier, define the constants here
archive = 0x20
compressed = 0x800
hidden = 0x2
device = 0x40
directory = 0x10
encrypted = 0x4000
normal = 0x80
not_content_indexed = 0x2000
offline = 0x1000
read_only = 0x1
reparse_point = 0x400
sparse_file = 0x200
system = 0x4
temporary = 0x100
virtual = 0x10000
@classmethod
def get(cls, filepath):
attrs = api.GetFileAttributes(filepath)
if attrs == api.INVALID_FILE_ATTRIBUTES:
raise WindowsError()
return cls(attrs)
@classmethod
def get(cls, filepath):
attrs = api.GetFileAttributes(filepath)
if attrs == api.INVALID_FILE_ATTRIBUTES:
raise WindowsError()
return cls(attrs)
GetFileAttributes = FileAttributes.get
def SetFileAttributes(filepath, *attrs):
"""
Set file attributes. e.g.:
"""
Set file attributes. e.g.:
SetFileAttributes('C:\\foo', 'hidden')
SetFileAttributes('C:\\foo', 'hidden')
Each attr must be either a numeric value, a constant defined in
jaraco.windows.filesystem.api, or one of the nice names
defined in this function.
"""
nice_names = collections.defaultdict(
lambda key: key,
hidden='FILE_ATTRIBUTE_HIDDEN',
read_only='FILE_ATTRIBUTE_READONLY',
)
flags = (getattr(api, nice_names[attr], attr) for attr in attrs)
flags = functools.reduce(operator.or_, flags)
handle_nonzero_success(api.SetFileAttributes(filepath, flags))
Each attr must be either a numeric value, a constant defined in
jaraco.windows.filesystem.api, or one of the nice names
defined in this function.
"""
nice_names = collections.defaultdict(
lambda key: key,
hidden='FILE_ATTRIBUTE_HIDDEN',
read_only='FILE_ATTRIBUTE_READONLY',
)
flags = (getattr(api, nice_names[attr], attr) for attr in attrs)
flags = functools.reduce(operator.or_, flags)
handle_nonzero_success(api.SetFileAttributes(filepath, flags))

View file

@ -1,109 +1,107 @@
from __future__ import unicode_literals
import os.path
# realpath taken from https://bugs.python.org/file38057/issue9949-v4.patch
def realpath(path):
if isinstance(path, str):
prefix = '\\\\?\\'
unc_prefix = prefix + 'UNC'
new_unc_prefix = '\\'
cwd = os.getcwd()
else:
prefix = b'\\\\?\\'
unc_prefix = prefix + b'UNC'
new_unc_prefix = b'\\'
cwd = os.getcwdb()
had_prefix = path.startswith(prefix)
path, ok = _resolve_path(cwd, path, {})
# The path returned by _getfinalpathname will always start with \\?\ -
# strip off that prefix unless it was already provided on the original
# path.
if not had_prefix:
# For UNC paths, the prefix will actually be \\?\UNC - handle that
# case as well.
if path.startswith(unc_prefix):
path = new_unc_prefix + path[len(unc_prefix):]
elif path.startswith(prefix):
path = path[len(prefix):]
return path
if isinstance(path, str):
prefix = '\\\\?\\'
unc_prefix = prefix + 'UNC'
new_unc_prefix = '\\'
cwd = os.getcwd()
else:
prefix = b'\\\\?\\'
unc_prefix = prefix + b'UNC'
new_unc_prefix = b'\\'
cwd = os.getcwdb()
had_prefix = path.startswith(prefix)
path, ok = _resolve_path(cwd, path, {})
# The path returned by _getfinalpathname will always start with \\?\ -
# strip off that prefix unless it was already provided on the original
# path.
if not had_prefix:
# For UNC paths, the prefix will actually be \\?\UNC - handle that
# case as well.
if path.startswith(unc_prefix):
path = new_unc_prefix + path[len(unc_prefix) :]
elif path.startswith(prefix):
path = path[len(prefix) :]
return path
def _resolve_path(path, rest, seen):
# Windows normalizes the path before resolving symlinks; be sure to
# follow the same behavior.
rest = os.path.normpath(rest)
def _resolve_path(path, rest, seen): # noqa: C901
# Windows normalizes the path before resolving symlinks; be sure to
# follow the same behavior.
rest = os.path.normpath(rest)
if isinstance(rest, str):
sep = '\\'
else:
sep = b'\\'
if isinstance(rest, str):
sep = '\\'
else:
sep = b'\\'
if os.path.isabs(rest):
drive, rest = os.path.splitdrive(rest)
path = drive + sep
rest = rest[1:]
if os.path.isabs(rest):
drive, rest = os.path.splitdrive(rest)
path = drive + sep
rest = rest[1:]
while rest:
name, _, rest = rest.partition(sep)
new_path = os.path.join(path, name) if path else name
if os.path.exists(new_path):
if not rest:
# The whole path exists. Resolve it using the OS.
path = os.path._getfinalpathname(new_path)
else:
# The OS can resolve `new_path`; keep traversing the path.
path = new_path
elif not os.path.lexists(new_path):
# `new_path` does not exist on the filesystem at all. Use the
# OS to resolve `path`, if it exists, and then append the
# remainder.
if os.path.exists(path):
path = os.path._getfinalpathname(path)
rest = os.path.join(name, rest) if rest else name
return os.path.join(path, rest), True
else:
# We have a symbolic link that the OS cannot resolve. Try to
# resolve it ourselves.
while rest:
name, _, rest = rest.partition(sep)
new_path = os.path.join(path, name) if path else name
if os.path.exists(new_path):
if not rest:
# The whole path exists. Resolve it using the OS.
path = os.path._getfinalpathname(new_path)
else:
# The OS can resolve `new_path`; keep traversing the path.
path = new_path
elif not os.path.lexists(new_path):
# `new_path` does not exist on the filesystem at all. Use the
# OS to resolve `path`, if it exists, and then append the
# remainder.
if os.path.exists(path):
path = os.path._getfinalpathname(path)
rest = os.path.join(name, rest) if rest else name
return os.path.join(path, rest), True
else:
# We have a symbolic link that the OS cannot resolve. Try to
# resolve it ourselves.
# On Windows, symbolic link resolution can be partially or
# fully disabled [1]. The end result of a disabled symlink
# appears the same as a broken symlink (lexists() returns True
# but exists() returns False). And in both cases, the link can
# still be read using readlink(). Call stat() and check the
# resulting error code to ensure we don't circumvent the
# Windows symbolic link restrictions.
# [1] https://technet.microsoft.com/en-us/library/cc754077.aspx
try:
os.stat(new_path)
except OSError as e:
# WinError 1463: The symbolic link cannot be followed
# because its type is disabled.
if e.winerror == 1463:
raise
# On Windows, symbolic link resolution can be partially or
# fully disabled [1]. The end result of a disabled symlink
# appears the same as a broken symlink (lexists() returns True
# but exists() returns False). And in both cases, the link can
# still be read using readlink(). Call stat() and check the
# resulting error code to ensure we don't circumvent the
# Windows symbolic link restrictions.
# [1] https://technet.microsoft.com/en-us/library/cc754077.aspx
try:
os.stat(new_path)
except OSError as e:
# WinError 1463: The symbolic link cannot be followed
# because its type is disabled.
if e.winerror == 1463:
raise
key = os.path.normcase(new_path)
if key in seen:
# This link has already been seen; try to use the
# previously resolved value.
path = seen[key]
if path is None:
# It has not yet been resolved, which means we must
# have a symbolic link loop. Return what we have
# resolved so far plus the remainder of the path (who
# cares about the Zen of Python?).
path = os.path.join(new_path, rest) if rest else new_path
return path, False
else:
# Mark this link as in the process of being resolved.
seen[key] = None
# Try to resolve it.
path, ok = _resolve_path(path, os.readlink(new_path), seen)
if ok:
# Resolution succeded; store the resolved value.
seen[key] = path
else:
# Resolution failed; punt.
return (os.path.join(path, rest) if rest else path), False
return path, True
key = os.path.normcase(new_path)
if key in seen:
# This link has already been seen; try to use the
# previously resolved value.
path = seen[key]
if path is None:
# It has not yet been resolved, which means we must
# have a symbolic link loop. Return what we have
# resolved so far plus the remainder of the path (who
# cares about the Zen of Python?).
path = os.path.join(new_path, rest) if rest else new_path
return path, False
else:
# Mark this link as in the process of being resolved.
seen[key] = None
# Try to resolve it.
path, ok = _resolve_path(path, os.readlink(new_path), seen)
if ok:
# Resolution succeded; store the resolved value.
seen[key] = path
else:
# Resolution failed; punt.
return (os.path.join(path, rest) if rest else path), False
return path, True

View file

@ -1,14 +1,10 @@
# -*- coding: UTF-8 -*-
"""
FileChange
Classes and routines for monitoring the file system for changes.
Classes and routines for monitoring the file system for changes.
Copyright © 2004, 2011, 2013 Jason R. Coombs
"""
from __future__ import print_function
import os
import sys
import datetime
@ -17,8 +13,6 @@ from threading import Thread
import itertools
import logging
import six
from more_itertools.recipes import consume
import jaraco.text
@ -29,243 +23,237 @@ log = logging.getLogger(__name__)
class NotifierException(Exception):
pass
pass
class FileFilter(object):
def set_root(self, root):
self.root = root
def set_root(self, root):
self.root = root
def _get_file_path(self, filename):
try:
filename = os.path.join(self.root, filename)
except AttributeError:
pass
return filename
def _get_file_path(self, filename):
try:
filename = os.path.join(self.root, filename)
except AttributeError:
pass
return filename
class ModifiedTimeFilter(FileFilter):
"""
Returns true for each call where the modified time of the file is after
the cutoff time.
"""
def __init__(self, cutoff):
self.cutoff = cutoff
"""
Returns true for each call where the modified time of the file is after
the cutoff time.
"""
def __call__(self, file):
filepath = self._get_file_path(file)
last_mod = datetime.datetime.utcfromtimestamp(
os.stat(filepath).st_mtime)
log.debug('{filepath} last modified at {last_mod}.'.format(**vars()))
return last_mod > self.cutoff
def __init__(self, cutoff):
self.cutoff = cutoff
def __call__(self, file):
filepath = self._get_file_path(file)
last_mod = datetime.datetime.utcfromtimestamp(os.stat(filepath).st_mtime)
log.debug('{filepath} last modified at {last_mod}.'.format(**vars()))
return last_mod > self.cutoff
class PatternFilter(FileFilter):
"""
Filter that returns True for files that match pattern (a regular
expression).
"""
def __init__(self, pattern):
self.pattern = (
re.compile(pattern) if isinstance(pattern, six.string_types)
else pattern
)
"""
Filter that returns True for files that match pattern (a regular
expression).
"""
def __call__(self, file):
return bool(self.pattern.match(file, re.I))
def __init__(self, pattern):
self.pattern = re.compile(pattern) if isinstance(pattern, str) else pattern
def __call__(self, file):
return bool(self.pattern.match(file, re.I))
class GlobFilter(PatternFilter):
"""
Filter that returns True for files that match the pattern (a glob
expression.
"""
def __init__(self, expression):
super(GlobFilter, self).__init__(
self.convert_file_pattern(expression))
"""
Filter that returns True for files that match the pattern (a glob
expression.
"""
@staticmethod
def convert_file_pattern(p):
r"""
converts a filename specification (such as c:\*.*) to an equivelent
regular expression
>>> GlobFilter.convert_file_pattern('/*')
'/.*'
"""
subs = (('\\', '\\\\'), ('.', '\\.'), ('*', '.*'), ('?', '.'))
return jaraco.text.multi_substitution(*subs)(p)
def __init__(self, expression):
super(GlobFilter, self).__init__(self.convert_file_pattern(expression))
@staticmethod
def convert_file_pattern(p):
r"""
converts a filename specification (such as c:\*.*) to an equivelent
regular expression
>>> GlobFilter.convert_file_pattern('/*')
'/.*'
"""
subs = (('\\', '\\\\'), ('.', '\\.'), ('*', '.*'), ('?', '.'))
return jaraco.text.multi_substitution(*subs)(p)
class AggregateFilter(FileFilter):
"""
This file filter will aggregate the filters passed to it, and when called,
will return the results of each filter ANDed together.
"""
def __init__(self, *filters):
self.filters = filters
"""
This file filter will aggregate the filters passed to it, and when called,
will return the results of each filter ANDed together.
"""
def set_root(self, root):
consume(f.set_root(root) for f in self.filters)
def __init__(self, *filters):
self.filters = filters
def __call__(self, file):
return all(fil(file) for fil in self.filters)
def set_root(self, root):
consume(f.set_root(root) for f in self.filters)
def __call__(self, file):
return all(fil(file) for fil in self.filters)
class OncePerModFilter(FileFilter):
def __init__(self):
self.history = list()
def __init__(self):
self.history = list()
def __call__(self, file):
file = os.path.join(self.root, file)
key = file, os.stat(file).st_mtime
result = key not in self.history
self.history.append(key)
if len(self.history) > 100:
del self.history[-50:]
return result
def __call__(self, file):
file = os.path.join(self.root, file)
key = file, os.stat(file).st_mtime
result = key not in self.history
self.history.append(key)
if len(self.history) > 100:
del self.history[-50:]
return result
def files_with_path(files, path):
return (os.path.join(path, file) for file in files)
return (os.path.join(path, file) for file in files)
def get_file_paths(walk_result):
root, dirs, files = walk_result
return files_with_path(files, root)
root, dirs, files = walk_result
return files_with_path(files, root)
class Notifier(object):
def __init__(self, root='.', filters=[]):
# assign the root, verify it exists
self.root = root
if not os.path.isdir(self.root):
raise NotifierException(
'Root directory "%s" does not exist' % self.root)
self.filters = filters
def __init__(self, root='.', filters=[]):
# assign the root, verify it exists
self.root = root
if not os.path.isdir(self.root):
raise NotifierException('Root directory "%s" does not exist' % self.root)
self.filters = filters
self.watch_subtree = False
self.quit_event = event.CreateEvent(None, 0, 0, None)
self.opm_filter = OncePerModFilter()
self.watch_subtree = False
self.quit_event = event.CreateEvent(None, 0, 0, None)
self.opm_filter = OncePerModFilter()
def __del__(self):
try:
fs.FindCloseChangeNotification(self.hChange)
except Exception:
pass
def __del__(self):
try:
fs.FindCloseChangeNotification(self.hChange)
except Exception:
pass
def _get_change_handle(self):
# set up to monitor the directory tree specified
self.hChange = fs.FindFirstChangeNotification(
self.root,
self.watch_subtree,
fs.FILE_NOTIFY_CHANGE_LAST_WRITE,
)
def _get_change_handle(self):
# set up to monitor the directory tree specified
self.hChange = fs.FindFirstChangeNotification(
self.root, self.watch_subtree, fs.FILE_NOTIFY_CHANGE_LAST_WRITE
)
# make sure it worked; if not, bail
INVALID_HANDLE_VALUE = fs.INVALID_HANDLE_VALUE
if self.hChange == INVALID_HANDLE_VALUE:
raise NotifierException(
'Could not set up directory change notification')
# make sure it worked; if not, bail
INVALID_HANDLE_VALUE = fs.INVALID_HANDLE_VALUE
if self.hChange == INVALID_HANDLE_VALUE:
raise NotifierException('Could not set up directory change notification')
@staticmethod
def _filtered_walk(path, file_filter):
"""
static method that calls os.walk, but filters out
anything that doesn't match the filter
"""
for root, dirs, files in os.walk(path):
log.debug('looking in %s', root)
log.debug('files is %s', files)
file_filter.set_root(root)
files = filter(file_filter, files)
log.debug('filtered files is %s', files)
yield (root, dirs, files)
@staticmethod
def _filtered_walk(path, file_filter):
"""
static method that calls os.walk, but filters out
anything that doesn't match the filter
"""
for root, dirs, files in os.walk(path):
log.debug('looking in %s', root)
log.debug('files is %s', files)
file_filter.set_root(root)
files = filter(file_filter, files)
log.debug('filtered files is %s', files)
yield (root, dirs, files)
def quit(self):
event.SetEvent(self.quit_event)
def quit(self):
event.SetEvent(self.quit_event)
class BlockingNotifier(Notifier):
@staticmethod
def wait_results(*args):
"""calls WaitForMultipleObjects repeatedly with args"""
return itertools.starmap(event.WaitForMultipleObjects, itertools.repeat(args))
@staticmethod
def wait_results(*args):
""" calls WaitForMultipleObjects repeatedly with args """
return itertools.starmap(
event.WaitForMultipleObjects,
itertools.repeat(args))
def get_changed_files(self):
self._get_change_handle()
check_time = datetime.datetime.utcnow()
# block (sleep) until something changes in the
# target directory or a quit is requested.
# timeout so we can catch keyboard interrupts or other exceptions
events = (self.hChange, self.quit_event)
for result in self.wait_results(events, False, 1000):
if result == event.WAIT_TIMEOUT:
continue
index = result - event.WAIT_OBJECT_0
if events[index] is self.quit_event:
# quit was received; stop yielding results
return
def get_changed_files(self):
self._get_change_handle()
check_time = datetime.datetime.utcnow()
# block (sleep) until something changes in the
# target directory or a quit is requested.
# timeout so we can catch keyboard interrupts or other exceptions
events = (self.hChange, self.quit_event)
for result in self.wait_results(events, False, 1000):
if result == event.WAIT_TIMEOUT:
continue
index = result - event.WAIT_OBJECT_0
if events[index] is self.quit_event:
# quit was received; stop yielding results
return
# something has changed.
log.debug('Change notification received')
fs.FindNextChangeNotification(self.hChange)
next_check_time = datetime.datetime.utcnow()
log.debug('Looking for all files changed after %s', check_time)
for file in self.find_files_after(check_time):
yield file
check_time = next_check_time
# something has changed.
log.debug('Change notification received')
fs.FindNextChangeNotification(self.hChange)
next_check_time = datetime.datetime.utcnow()
log.debug('Looking for all files changed after %s', check_time)
for file in self.find_files_after(check_time):
yield file
check_time = next_check_time
def find_files_after(self, cutoff):
mtf = ModifiedTimeFilter(cutoff)
af = AggregateFilter(mtf, self.opm_filter, *self.filters)
results = Notifier._filtered_walk(self.root, af)
results = itertools.imap(get_file_paths, results)
if self.watch_subtree:
result = itertools.chain(*results)
else:
result = next(results)
return result
def find_files_after(self, cutoff):
mtf = ModifiedTimeFilter(cutoff)
af = AggregateFilter(mtf, self.opm_filter, *self.filters)
results = Notifier._filtered_walk(self.root, af)
results = itertools.imap(get_file_paths, results)
if self.watch_subtree:
result = itertools.chain(*results)
else:
result = next(results)
return result
class ThreadedNotifier(BlockingNotifier, Thread):
r"""
ThreadedNotifier provides a simple interface that calls the handler
for each file rooted in root that passes the filters. It runs as its own
thread, so must be started as such::
r"""
ThreadedNotifier provides a simple interface that calls the handler
for each file rooted in root that passes the filters. It runs as its own
thread, so must be started as such::
notifier = ThreadedNotifier('c:\\', handler = StreamHandler())
notifier.start()
C:\Autoexec.bat changed.
"""
def __init__(self, root='.', filters=[], handler=lambda file: None):
# init notifier stuff
BlockingNotifier.__init__(self, root, filters)
# init thread stuff
Thread.__init__(self)
# set it as a daemon thread so that it doesn't block waiting to close.
# I tried setting __del__(self) to .quit(), but unfortunately, there
# are references to this object in the win32api stuff, so __del__
# never gets called.
self.setDaemon(True)
notifier = ThreadedNotifier('c:\\', handler = StreamHandler())
notifier.start()
C:\Autoexec.bat changed.
"""
self.handle = handler
def __init__(self, root='.', filters=[], handler=lambda file: None):
# init notifier stuff
BlockingNotifier.__init__(self, root, filters)
# init thread stuff
Thread.__init__(self)
# set it as a daemon thread so that it doesn't block waiting to close.
# I tried setting __del__(self) to .quit(), but unfortunately, there
# are references to this object in the win32api stuff, so __del__
# never gets called.
self.setDaemon(True)
def run(self):
for file in self.get_changed_files():
self.handle(file)
self.handle = handler
def run(self):
for file in self.get_changed_files():
self.handle(file)
class StreamHandler(object):
"""
StreamHandler: a sample handler object for use with the threaded
notifier that will announce by writing to the supplied stream
(stdout by default) the name of the file.
"""
def __init__(self, output=sys.stdout):
self.output = output
"""
StreamHandler: a sample handler object for use with the threaded
notifier that will announce by writing to the supplied stream
(stdout by default) the name of the file.
"""
def __call__(self, filename):
self.output.write('%s changed.\n' % filename)
def __init__(self, output=sys.stdout):
self.output = output
def __call__(self, filename):
self.output.write('%s changed.\n' % filename)