add jaraco to test sym links on Windows. Fixes #894

This commit is contained in:
clinton-hall 2015-12-19 11:29:22 +10:30
commit 52cc753881
50 changed files with 3862 additions and 7 deletions

View file

@ -252,14 +252,23 @@ def copy_link(src, targetLink, useLink):
return True
def replace_links(link):
if not os.path.islink(link):
logger.debug('%s is not a link' % (link))
return
target = link
n = 0
while os.path.islink(target):
target = os.readlink(target)
n = n + 1
target = link
if os.name == 'nt':
import jaraco
if not jaraco.windows.filesystem.islink(link):
logger.debug('%s is not a link' % (link))
return
while jaraco.windows.filesystem.islink(target):
target = jaraco.windows.filesystem.readlink(target)
n = n + 1
else:
if not os.path.islink(link):
logger.debug('%s is not a link' % (link))
return
while os.path.islink(target):
target = os.readlink(target)
n = n + 1
if n > 1:
logger.info("Changing sym-link: %s to point directly to file: %s" % (link, target), 'COPYLINK')
os.unlink(link)

10
libs/jaraco/__init__.py Normal file
View file

@ -0,0 +1,10 @@
# this is a namespace package
__import__('pkg_resources').declare_namespace(__name__)
try:
# py2exe support (http://www.py2exe.org/index.cgi/ExeWithEggs)
import modulefinder
for p in __path__:
modulefinder.AddPackagePath(__name__, p)
except ImportError:
pass

View file

@ -0,0 +1,13 @@
#!/usr/bin/env python
# $Id$
"""
jaraco.windows
A lightweight wrapper to provide a pythonic API to the Windows Platform.
This package attempts to provide interfaces similar or compatible
with Mark Hammond's pywin32 library, but avoids the use of extension
modules by utilizing ctypes.
"""

View file

View file

@ -0,0 +1,47 @@
import ctypes.wintypes
# Clipboard Formats
CF_TEXT = 1
CF_BITMAP = 2
CF_METAFILEPICT = 3
CF_SYLK = 4
CF_DIF = 5
CF_TIFF = 6
CF_OEMTEXT = 7
CF_DIB = 8
CF_PALETTE = 9
CF_PENDATA = 10
CF_RIFF = 11
CF_WAVE = 12
CF_UNICODETEXT = 13
CF_ENHMETAFILE = 14
CF_HDROP = 15
CF_LOCALE = 16
CF_DIBV5 = 17
CF_MAX = 18
CF_OWNERDISPLAY = 0x0080
CF_DSPTEXT = 0x0081
CF_DSPBITMAP = 0x0082
CF_DSPMETAFILEPICT = 0x0083
CF_DSPENHMETAFILE = 0x008E
CF_PRIVATEFIRST = 0x0200
CF_PRIVATELAST = 0x02FF
CF_GDIOBJFIRST = 0x0300
CF_GDIOBJLAST = 0x03FF
RegisterClipboardFormat = ctypes.windll.user32.RegisterClipboardFormatW
RegisterClipboardFormat.argtypes = ctypes.wintypes.LPWSTR,
RegisterClipboardFormat.restype = ctypes.wintypes.UINT
CF_HTML = RegisterClipboardFormat('HTML Format')
EnumClipboardFormats = ctypes.windll.user32.EnumClipboardFormats
EnumClipboardFormats.argtypes = ctypes.wintypes.UINT,
EnumClipboardFormats.restype = ctypes.wintypes.UINT
GetClipboardData = ctypes.windll.user32.GetClipboardData
GetClipboardData.argtypes = ctypes.wintypes.UINT,
GetClipboardData.restype = ctypes.wintypes.HANDLE
SetClipboardData = ctypes.windll.user32.SetClipboardData
SetClipboardData.argtypes = ctypes.wintypes.UINT, ctypes.wintypes.HANDLE
SetClipboardData.restype = ctypes.wintypes.HANDLE

View file

@ -0,0 +1,59 @@
"""
Support for Credential Vault
"""
import ctypes
from ctypes.wintypes import DWORD, LPCWSTR, BOOL, LPWSTR, FILETIME
try:
from ctypes.wintypes import LPBYTE
except ImportError:
LPBYTE = ctypes.POINTER(ctypes.wintypes.BYTE)
class CredentialAttribute(ctypes.Structure):
_fields_ = []
class Credential(ctypes.Structure):
_fields_ = [
('flags', DWORD),
('type', DWORD),
('target_name', LPWSTR),
('comment', LPWSTR),
('last_written', FILETIME),
('credential_blob_size', DWORD),
('credential_blob', LPBYTE),
('persist', DWORD),
('attribute_count', DWORD),
('attributes', ctypes.POINTER(CredentialAttribute)),
('target_alias', LPWSTR),
('user_name', LPWSTR),
]
def __del__(self):
ctypes.windll.advapi32.CredFree(ctypes.byref(self))
PCREDENTIAL = ctypes.POINTER(Credential)
CredRead = ctypes.windll.advapi32.CredReadW
CredRead.argtypes = (
LPCWSTR, # TargetName
DWORD, # Type
DWORD, # Flags
ctypes.POINTER(PCREDENTIAL), # Credential
)
CredRead.restype = BOOL
CredWrite = ctypes.windll.advapi32.CredWriteW
CredWrite.argtypes = (
PCREDENTIAL, # Credential
DWORD, # Flags
)
CredWrite.restype = BOOL
CredDelete = ctypes.windll.advapi32.CredDeleteW
CredDelete.argtypes = (
LPCWSTR, # TargetName
DWORD, # Type
DWORD, # Flags
)
CredDelete.restype = BOOL

View file

@ -0,0 +1,13 @@
import ctypes.wintypes
SetEnvironmentVariable = ctypes.windll.kernel32.SetEnvironmentVariableW
SetEnvironmentVariable.restype = ctypes.wintypes.BOOL
SetEnvironmentVariable.argtypes = [ctypes.wintypes.LPCWSTR]*2
GetEnvironmentVariable = ctypes.windll.kernel32.GetEnvironmentVariableW
GetEnvironmentVariable.restype = ctypes.wintypes.BOOL
GetEnvironmentVariable.argtypes = [
ctypes.wintypes.LPCWSTR,
ctypes.wintypes.LPWSTR,
ctypes.wintypes.DWORD,
]

View file

@ -0,0 +1,7 @@
# from error.h
NO_ERROR = 0
ERROR_INSUFFICIENT_BUFFER = 122
ERROR_BUFFER_OVERFLOW = 111
ERROR_NO_DATA = 232
ERROR_INVALID_PARAMETER = 87
ERROR_NOT_SUPPORTED = 50

View file

@ -0,0 +1,41 @@
from ctypes import (
Structure, windll, POINTER, byref, cast, create_unicode_buffer,
c_size_t, c_int, create_string_buffer, c_uint64, c_ushort, c_short,
c_uint,
)
from ctypes.wintypes import (
BOOLEAN, LPWSTR, DWORD, LPVOID, HANDLE, FILETIME,
WCHAR, BOOL, HWND, WORD, UINT,
)
CreateEvent = windll.kernel32.CreateEventW
CreateEvent.argtypes = (
LPVOID, # LPSECURITY_ATTRIBUTES
BOOL,
BOOL,
LPWSTR,
)
CreateEvent.restype = HANDLE
SetEvent = windll.kernel32.SetEvent
SetEvent.argtypes = HANDLE,
SetEvent.restype = BOOL
WaitForSingleObject = windll.kernel32.WaitForSingleObject
WaitForSingleObject.argtypes = HANDLE, DWORD
WaitForSingleObject.restype = DWORD
_WaitForMultipleObjects = windll.kernel32.WaitForMultipleObjects
_WaitForMultipleObjects.argtypes = DWORD, POINTER(HANDLE), BOOL, DWORD
_WaitForMultipleObjects.restype = DWORD
def WaitForMultipleObjects(handles, wait_all=False, timeout=0):
n_handles = len(handles)
handle_array = (HANDLE*n_handles)()
for index, handle in enumerate(handles):
handle_array[index] = handle
return _WaitForMultipleObjects(n_handles, handle_array, wait_all, timeout)
WAIT_OBJECT_0 = 0
INFINITE = -1
WAIT_TIMEOUT = 0x102

View file

@ -0,0 +1,228 @@
import ctypes.wintypes
CreateSymbolicLink = ctypes.windll.kernel32.CreateSymbolicLinkW
CreateSymbolicLink.argtypes = (
ctypes.wintypes.LPWSTR,
ctypes.wintypes.LPWSTR,
ctypes.wintypes.DWORD,
)
CreateSymbolicLink.restype = ctypes.wintypes.BOOLEAN
CreateHardLink = ctypes.windll.kernel32.CreateHardLinkW
CreateHardLink.argtypes = (
ctypes.wintypes.LPWSTR,
ctypes.wintypes.LPWSTR,
ctypes.wintypes.LPVOID, # reserved for LPSECURITY_ATTRIBUTES
)
CreateHardLink.restype = ctypes.wintypes.BOOLEAN
GetFileAttributes = ctypes.windll.kernel32.GetFileAttributesW
GetFileAttributes.argtypes = ctypes.wintypes.LPWSTR,
GetFileAttributes.restype = ctypes.wintypes.DWORD
SetFileAttributes = ctypes.windll.kernel32.SetFileAttributesW
SetFileAttributes.argtypes = ctypes.wintypes.LPWSTR, ctypes.wintypes.DWORD
SetFileAttributes.restype = ctypes.wintypes.BOOL
MAX_PATH = 260
GetFinalPathNameByHandle = ctypes.windll.kernel32.GetFinalPathNameByHandleW
GetFinalPathNameByHandle.argtypes = (
ctypes.wintypes.HANDLE, ctypes.wintypes.LPWSTR, ctypes.wintypes.DWORD, ctypes.wintypes.DWORD,
)
GetFinalPathNameByHandle.restype = ctypes.wintypes.DWORD
class SECURITY_ATTRIBUTES(ctypes.Structure):
_fields_ = (
('length', ctypes.wintypes.DWORD),
('p_security_descriptor', ctypes.wintypes.LPVOID),
('inherit_handle', ctypes.wintypes.BOOLEAN),
)
LPSECURITY_ATTRIBUTES = ctypes.POINTER(SECURITY_ATTRIBUTES)
CreateFile = ctypes.windll.kernel32.CreateFileW
CreateFile.argtypes = (
ctypes.wintypes.LPWSTR,
ctypes.wintypes.DWORD,
ctypes.wintypes.DWORD,
LPSECURITY_ATTRIBUTES,
ctypes.wintypes.DWORD,
ctypes.wintypes.DWORD,
ctypes.wintypes.HANDLE,
)
CreateFile.restype = ctypes.wintypes.HANDLE
FILE_SHARE_READ = 1
FILE_SHARE_WRITE = 2
FILE_SHARE_DELETE = 4
FILE_FLAG_OPEN_REPARSE_POINT = 0x00200000
FILE_FLAG_BACKUP_SEMANTICS = 0x2000000
NULL = 0
OPEN_EXISTING = 3
FILE_ATTRIBUTE_READONLY = 0x1
FILE_ATTRIBUTE_HIDDEN = 0x2
FILE_ATTRIBUTE_DIRECTORY = 0x10
FILE_ATTRIBUTE_NORMAL = 0x80
FILE_ATTRIBUTE_REPARSE_POINT = 0x400
GENERIC_READ = 0x80000000
FILE_READ_ATTRIBUTES = 0x80
INVALID_HANDLE_VALUE = ctypes.wintypes.HANDLE(-1).value
INVALID_FILE_ATTRIBUTES = 0xFFFFFFFF
ERROR_NO_MORE_FILES = 0x12
VOLUME_NAME_DOS = 0
CloseHandle = ctypes.windll.kernel32.CloseHandle
CloseHandle.argtypes = (ctypes.wintypes.HANDLE,)
CloseHandle.restype = ctypes.wintypes.BOOLEAN
class WIN32_FIND_DATA(ctypes.Structure):
_fields_ = [
('file_attributes', ctypes.wintypes.DWORD),
('creation_time', ctypes.wintypes.FILETIME),
('last_access_time', ctypes.wintypes.FILETIME),
('last_write_time', ctypes.wintypes.FILETIME),
('file_size_words', ctypes.wintypes.DWORD*2),
('reserved', ctypes.wintypes.DWORD*2),
('filename', ctypes.wintypes.WCHAR*MAX_PATH),
('alternate_filename', ctypes.wintypes.WCHAR*14),
]
@property
def file_size(self):
return ctypes.cast(self.file_size_words, ctypes.POINTER(ctypes.c_uint64)).contents
LPWIN32_FIND_DATA = ctypes.POINTER(WIN32_FIND_DATA)
FindFirstFile = ctypes.windll.kernel32.FindFirstFileW
FindFirstFile.argtypes = (ctypes.wintypes.LPWSTR, LPWIN32_FIND_DATA)
FindFirstFile.restype = ctypes.wintypes.HANDLE
FindNextFile = ctypes.windll.kernel32.FindNextFileW
FindNextFile.argtypes = (ctypes.wintypes.HANDLE, LPWIN32_FIND_DATA)
FindNextFile.restype = ctypes.wintypes.BOOLEAN
SCS_32BIT_BINARY = 0 # A 32-bit Windows-based application
SCS_64BIT_BINARY = 6 # A 64-bit Windows-based application
SCS_DOS_BINARY = 1 # An MS-DOS-based application
SCS_OS216_BINARY = 5 # A 16-bit OS/2-based application
SCS_PIF_BINARY = 3 # A PIF file that executes an MS-DOS-based application
SCS_POSIX_BINARY = 4 # A POSIX-based application
SCS_WOW_BINARY = 2 # A 16-bit Windows-based application
_GetBinaryType = ctypes.windll.kernel32.GetBinaryTypeW
_GetBinaryType.argtypes = (ctypes.wintypes.LPWSTR, ctypes.POINTER(ctypes.wintypes.DWORD))
_GetBinaryType.restype = ctypes.wintypes.BOOL
FILEOP_FLAGS = ctypes.wintypes.WORD
class SHFILEOPSTRUCT(ctypes.Structure):
_fields_ = [
('status_dialog', ctypes.wintypes.HWND),
('operation', ctypes.wintypes.UINT),
('from_', ctypes.wintypes.LPWSTR),
('to', ctypes.wintypes.LPWSTR),
('flags', FILEOP_FLAGS),
('operations_aborted', ctypes.wintypes.BOOL),
('name_mapping_handles', ctypes.wintypes.LPVOID),
('progress_title', ctypes.wintypes.LPWSTR),
]
_SHFileOperation = ctypes.windll.shell32.SHFileOperationW
_SHFileOperation.argtypes = [ctypes.POINTER(SHFILEOPSTRUCT)]
_SHFileOperation.restype = ctypes.c_int
FOF_ALLOWUNDO = 64
FOF_NOCONFIRMATION = 16
FO_DELETE = 3
ReplaceFile = ctypes.windll.kernel32.ReplaceFileW
ReplaceFile.restype = ctypes.wintypes.BOOL
ReplaceFile.argtypes = [
ctypes.wintypes.LPWSTR,
ctypes.wintypes.LPWSTR,
ctypes.wintypes.LPWSTR,
ctypes.wintypes.DWORD,
ctypes.wintypes.LPVOID,
ctypes.wintypes.LPVOID,
]
REPLACEFILE_WRITE_THROUGH = 0x1
REPLACEFILE_IGNORE_MERGE_ERRORS = 0x2
REPLACEFILE_IGNORE_ACL_ERRORS = 0x4
class STAT_STRUCT(ctypes.Structure):
_fields_ = [
('dev', ctypes.c_uint),
('ino', ctypes.c_ushort),
('mode', ctypes.c_ushort),
('nlink', ctypes.c_short),
('uid', ctypes.c_short),
('gid', ctypes.c_short),
('rdev', ctypes.c_uint),
# the following 4 fields are ctypes.c_uint64 for _stat64
('size', ctypes.c_uint),
('atime', ctypes.c_uint),
('mtime', ctypes.c_uint),
('ctime', ctypes.c_uint),
]
_wstat = ctypes.windll.msvcrt._wstat
_wstat.argtypes = [ctypes.wintypes.LPWSTR, ctypes.POINTER(STAT_STRUCT)]
_wstat.restype = ctypes.c_int
FILE_NOTIFY_CHANGE_LAST_WRITE = 0x10
FindFirstChangeNotification = ctypes.windll.kernel32.FindFirstChangeNotificationW
FindFirstChangeNotification.argtypes = ctypes.wintypes.LPWSTR, ctypes.wintypes.BOOL, ctypes.wintypes.DWORD
FindFirstChangeNotification.restype = ctypes.wintypes.HANDLE
FindCloseChangeNotification = ctypes.windll.kernel32.FindCloseChangeNotification
FindCloseChangeNotification.argtypes = ctypes.wintypes.HANDLE,
FindCloseChangeNotification.restype = ctypes.wintypes.BOOL
FindNextChangeNotification = ctypes.windll.kernel32.FindNextChangeNotification
FindNextChangeNotification.argtypes = ctypes.wintypes.HANDLE,
FindNextChangeNotification.restype = ctypes.wintypes.BOOL
FILE_FLAG_OPEN_REPARSE_POINT = 0x00200000
IO_REPARSE_TAG_SYMLINK = 0xA000000C
FSCTL_GET_REPARSE_POINT = 0x900a8
LPDWORD = ctypes.POINTER(ctypes.wintypes.DWORD)
LPOVERLAPPED = ctypes.wintypes.LPVOID
DeviceIoControl = ctypes.windll.kernel32.DeviceIoControl
DeviceIoControl.argtypes = [
ctypes.wintypes.HANDLE,
ctypes.wintypes.DWORD,
ctypes.wintypes.LPVOID,
ctypes.wintypes.DWORD,
ctypes.wintypes.LPVOID,
ctypes.wintypes.DWORD,
LPDWORD,
LPOVERLAPPED,
]
DeviceIoControl.restype = ctypes.wintypes.BOOL
class REPARSE_DATA_BUFFER(ctypes.Structure):
_fields_ = [
('tag', ctypes.c_ulong),
('data_length', ctypes.c_ushort),
('reserved', ctypes.c_ushort),
('substitute_name_offset', ctypes.c_ushort),
('substitute_name_length', ctypes.c_ushort),
('print_name_offset', ctypes.c_ushort),
('print_name_length', ctypes.c_ushort),
('flags', ctypes.c_ulong),
('path_buffer', ctypes.c_byte*1),
]
def get_print_name(self):
wchar_size = ctypes.sizeof(ctypes.wintypes.WCHAR)
arr_typ = ctypes.wintypes.WCHAR*(self.print_name_length//wchar_size)
data = ctypes.byref(self.path_buffer, self.print_name_offset)
return ctypes.cast(data, ctypes.POINTER(arr_typ)).contents.value
def get_substitute_name(self):
wchar_size = ctypes.sizeof(ctypes.wintypes.WCHAR)
arr_typ = ctypes.wintypes.WCHAR*(self.substitute_name_length//wchar_size)
data = ctypes.byref(self.path_buffer, self.substitute_name_offset)
return ctypes.cast(data, ctypes.POINTER(arr_typ)).contents.value

View file

@ -0,0 +1,203 @@
import struct
import ctypes.wintypes
from ctypes.wintypes import DWORD, WCHAR, BYTE, BOOL
# from mprapi.h
MAX_INTERFACE_NAME_LEN = 2**8
# from iprtrmib.h
MAXLEN_PHYSADDR = 2**3
MAXLEN_IFDESCR = 2**8
# from iptypes.h
MAX_ADAPTER_ADDRESS_LENGTH = 8
MAX_DHCPV6_DUID_LENGTH = 130
class MIB_IFROW(ctypes.Structure):
_fields_ = (
('name', WCHAR*MAX_INTERFACE_NAME_LEN),
('index', DWORD),
('type', DWORD),
('MTU', DWORD),
('speed', DWORD),
('physical_address_length', DWORD),
('physical_address_raw', BYTE*MAXLEN_PHYSADDR),
('admin_status', DWORD),
('operational_status', DWORD),
('last_change', DWORD),
('octets_received', DWORD),
('unicast_packets_received', DWORD),
('non_unicast_packets_received', DWORD),
('incoming_discards', DWORD),
('incoming_errors', DWORD),
('incoming_unknown_protocols', DWORD),
('octets_sent', DWORD),
('unicast_packets_sent', DWORD),
('non_unicast_packets_sent', DWORD),
('outgoing_discards', DWORD),
('outgoing_errors', DWORD),
('outgoing_queue_length', DWORD),
('description_length', DWORD),
('description_raw', ctypes.c_char*MAXLEN_IFDESCR),
)
def _get_binary_property(self, name):
val_prop = '{0}_raw'.format(name)
val = getattr(self, val_prop)
len_prop = '{0}_length'.format(name)
length = getattr(self, len_prop)
return str(buffer(val))[:length]
@property
def physical_address(self):
return self._get_binary_property('physical_address')
@property
def description(self):
return self._get_binary_property('description')
class MIB_IFTABLE(ctypes.Structure):
_fields_ = (
('num_entries', DWORD), # dwNumEntries
('entries', MIB_IFROW*0), # table
)
class MIB_IPADDRROW(ctypes.Structure):
_fields_ = (
('address_num', DWORD),
('index', DWORD),
('mask', DWORD),
('broadcast_address', DWORD),
('reassembly_size', DWORD),
('unused', ctypes.c_ushort),
('type', ctypes.c_ushort),
)
@property
def address(self):
"The address in big-endian"
_ = struct.pack('L', self.address_num)
return struct.unpack('!L', _)[0]
class MIB_IPADDRTABLE(ctypes.Structure):
_fields_ = (
('num_entries', DWORD),
('entries', MIB_IPADDRROW*0),
)
class SOCKADDR(ctypes.Structure):
_fields_ = (
('family', ctypes.c_ushort),
('data', ctypes.c_byte*14),
)
LPSOCKADDR = ctypes.POINTER(SOCKADDR)
class SOCKET_ADDRESS(ctypes.Structure):
_fields_ = [
('address', LPSOCKADDR),
('length', ctypes.c_int),
]
class _IP_ADAPTER_ADDRESSES_METRIC(ctypes.Structure):
_fields_ = [
('length', ctypes.c_ulong),
('interface_index', DWORD),
]
class _IP_ADAPTER_ADDRESSES_U1(ctypes.Union):
_fields_ = [
('alignment', ctypes.c_ulonglong),
('metric', _IP_ADAPTER_ADDRESSES_METRIC),
]
class IP_ADAPTER_ADDRESSES(ctypes.Structure):
pass
LP_IP_ADAPTER_ADDRESSES = ctypes.POINTER(IP_ADAPTER_ADDRESSES)
# for now, just use void * for pointers to unused structures
PIP_ADAPTER_UNICAST_ADDRESS = ctypes.c_void_p
PIP_ADAPTER_ANYCAST_ADDRESS = ctypes.c_void_p
PIP_ADAPTER_MULTICAST_ADDRESS = ctypes.c_void_p
PIP_ADAPTER_DNS_SERVER_ADDRESS = ctypes.c_void_p
PIP_ADAPTER_PREFIX = ctypes.c_void_p
PIP_ADAPTER_WINS_SERVER_ADDRESS_LH = ctypes.c_void_p
PIP_ADAPTER_GATEWAY_ADDRESS_LH = ctypes.c_void_p
PIP_ADAPTER_DNS_SUFFIX = ctypes.c_void_p
IF_OPER_STATUS = ctypes.c_uint # this is an enum, consider http://code.activestate.com/recipes/576415/
IF_LUID = ctypes.c_uint64
NET_IF_COMPARTMENT_ID = ctypes.c_uint32
GUID = ctypes.c_byte*16
NET_IF_NETWORK_GUID = GUID
NET_IF_CONNECTION_TYPE = ctypes.c_uint # enum
TUNNEL_TYPE = ctypes.c_uint # enum
IP_ADAPTER_ADDRESSES._fields_ = [
#('u', _IP_ADAPTER_ADDRESSES_U1),
('length', ctypes.c_ulong),
('interface_index', DWORD),
('next', LP_IP_ADAPTER_ADDRESSES),
('adapter_name', ctypes.c_char_p),
('first_unicast_address', PIP_ADAPTER_UNICAST_ADDRESS),
('first_anycast_address', PIP_ADAPTER_ANYCAST_ADDRESS),
('first_multicast_address', PIP_ADAPTER_MULTICAST_ADDRESS),
('first_dns_server_address', PIP_ADAPTER_DNS_SERVER_ADDRESS),
('dns_suffix', ctypes.c_wchar_p),
('description', ctypes.c_wchar_p),
('friendly_name', ctypes.c_wchar_p),
('byte', BYTE*MAX_ADAPTER_ADDRESS_LENGTH),
('physical_address_length', DWORD),
('flags', DWORD),
('mtu', DWORD),
('interface_type', DWORD),
('oper_status', IF_OPER_STATUS),
('ipv6_interface_index', DWORD),
('zone_indices', DWORD),
('first_prefix', PIP_ADAPTER_PREFIX),
('transmit_link_speed', ctypes.c_uint64),
('receive_link_speed', ctypes.c_uint64),
('first_wins_server_address', PIP_ADAPTER_WINS_SERVER_ADDRESS_LH),
('first_gateway_address', PIP_ADAPTER_GATEWAY_ADDRESS_LH),
('ipv4_metric', ctypes.c_ulong),
('ipv6_metric', ctypes.c_ulong),
('luid', IF_LUID),
('dhcpv4_server', SOCKET_ADDRESS),
('compartment_id', NET_IF_COMPARTMENT_ID),
('network_guid', NET_IF_NETWORK_GUID),
('connection_type', NET_IF_CONNECTION_TYPE),
('tunnel_type', TUNNEL_TYPE),
('dhcpv6_server', SOCKET_ADDRESS),
('dhcpv6_client_duid', ctypes.c_byte*MAX_DHCPV6_DUID_LENGTH),
('dhcpv6_client_duid_length', ctypes.c_ulong),
('dhcpv6_iaid', ctypes.c_ulong),
('first_dns_suffix', PIP_ADAPTER_DNS_SUFFIX),
]
# define some parameters to the API Functions
GetIfTable = ctypes.windll.iphlpapi.GetIfTable
GetIfTable.argtypes = [
ctypes.POINTER(MIB_IFTABLE),
ctypes.POINTER(ctypes.c_ulong),
BOOL,
]
GetIfTable.restype = DWORD
GetIpAddrTable = ctypes.windll.iphlpapi.GetIpAddrTable
GetIpAddrTable.argtypes = [
ctypes.POINTER(MIB_IPADDRTABLE),
ctypes.POINTER(ctypes.c_ulong),
BOOL,
]
GetIpAddrTable.restype = DWORD
GetAdaptersAddresses = ctypes.windll.iphlpapi.GetAdaptersAddresses
GetAdaptersAddresses.argtypes = [
ctypes.c_ulong,
ctypes.c_ulong,
ctypes.c_void_p,
ctypes.POINTER(IP_ADAPTER_ADDRESSES),
ctypes.POINTER(ctypes.c_ulong),
]
GetAdaptersAddresses.restype = ctypes.c_ulong

View file

@ -0,0 +1,9 @@
import ctypes.wintypes
GetModuleFileName = ctypes.windll.kernel32.GetModuleFileNameW
GetModuleFileName.argtypes = (
ctypes.wintypes.HANDLE,
ctypes.wintypes.LPWSTR,
ctypes.wintypes.DWORD,
)
GetModuleFileName.restype = ctypes.wintypes.DWORD

View file

@ -0,0 +1,33 @@
import ctypes.wintypes
GMEM_MOVEABLE = 0x2
GlobalAlloc = ctypes.windll.kernel32.GlobalAlloc
GlobalAlloc.argtypes = ctypes.wintypes.UINT, ctypes.c_ssize_t
GlobalAlloc.restype = ctypes.wintypes.HANDLE
GlobalLock = ctypes.windll.kernel32.GlobalLock
GlobalLock.argtypes = ctypes.wintypes.HGLOBAL,
GlobalLock.restype = ctypes.wintypes.LPVOID
GlobalUnlock = ctypes.windll.kernel32.GlobalUnlock
GlobalUnlock.argtypes = ctypes.wintypes.HGLOBAL,
GlobalUnlock.restype = ctypes.wintypes.BOOL
GlobalSize = ctypes.windll.kernel32.GlobalSize
GlobalSize.argtypes = ctypes.wintypes.HGLOBAL,
GlobalSize.restype = ctypes.c_size_t
CreateFileMapping = ctypes.windll.kernel32.CreateFileMappingW
CreateFileMapping.argtypes = [
ctypes.wintypes.HANDLE,
ctypes.c_void_p,
ctypes.wintypes.DWORD,
ctypes.wintypes.DWORD,
ctypes.wintypes.DWORD,
ctypes.wintypes.LPWSTR,
]
CreateFileMapping.restype = ctypes.wintypes.HANDLE
MapViewOfFile = ctypes.windll.kernel32.MapViewOfFile
MapViewOfFile.restype = ctypes.wintypes.HANDLE

View file

@ -0,0 +1,50 @@
#!/usr/bin/env python
"""
jaraco.windows.message
Windows Messaging support
"""
import ctypes
from ctypes.wintypes import HWND, UINT, WPARAM, LPARAM, DWORD, LPVOID
import six
LRESULT = LPARAM
class LPARAM_wstr(LPARAM):
"""
A special instance of LPARAM that can be constructed from a string
instance (for functions such as SendMessage, whose LPARAM may point to
a unicode string).
"""
@classmethod
def from_param(cls, param):
if isinstance(param, six.string_types):
return LPVOID.from_param(six.text_type(param))
return LPARAM.from_param(param)
SendMessage = ctypes.windll.user32.SendMessageW
SendMessage.argtypes = (HWND, UINT, WPARAM, LPARAM_wstr)
SendMessage.restype = LRESULT
HWND_BROADCAST=0xFFFF
WM_SETTINGCHANGE=0x1A
# constants from http://msdn.microsoft.com/en-us/library/ms644952%28v=vs.85%29.aspx
SMTO_ABORTIFHUNG = 0x02
SMTO_BLOCK = 0x01
SMTO_NORMAL = 0x00
SMTO_NOTIMEOUTIFNOTHUNG = 0x08
SMTO_ERRORONEXIT = 0x20
SendMessageTimeout = ctypes.windll.user32.SendMessageTimeoutW
SendMessageTimeout.argtypes = SendMessage.argtypes + (
UINT, UINT, ctypes.POINTER(DWORD)
)
SendMessageTimeout.restype = LRESULT
def unicode_as_lparam(source):
pointer = ctypes.cast(ctypes.c_wchar_p(source), ctypes.c_void_p)
return LPARAM(pointer.value)

View file

@ -0,0 +1,27 @@
import ctypes.wintypes
# MPR - Multiple Provider Router
mpr = ctypes.windll.mpr
RESOURCETYPE_ANY = 0
class NETRESOURCE(ctypes.Structure):
_fields_ = [
('scope', ctypes.wintypes.DWORD),
('type', ctypes.wintypes.DWORD),
('display_type', ctypes.wintypes.DWORD),
('usage', ctypes.wintypes.DWORD),
('local_name', ctypes.wintypes.LPWSTR),
('remote_name', ctypes.wintypes.LPWSTR),
('comment', ctypes.wintypes.LPWSTR),
('provider', ctypes.wintypes.LPWSTR),
]
LPNETRESOURCE = ctypes.POINTER(NETRESOURCE)
WNetAddConnection2 = mpr.WNetAddConnection2W
WNetAddConnection2.argtypes = (
LPNETRESOURCE,
ctypes.wintypes.LPCWSTR,
ctypes.wintypes.LPCWSTR,
ctypes.wintypes.DWORD,
)

View file

@ -0,0 +1,33 @@
import ctypes.wintypes
class SYSTEM_POWER_STATUS(ctypes.Structure):
_fields_ = (
('ac_line_status', ctypes.wintypes.BYTE),
('battery_flag', ctypes.wintypes.BYTE),
('battery_life_percent', ctypes.wintypes.BYTE),
('reserved', ctypes.wintypes.BYTE),
('battery_life_time', ctypes.wintypes.DWORD),
('battery_full_life_time', ctypes.wintypes.DWORD),
)
@property
def ac_line_status_string(self):
return {0:'offline', 1: 'online', 255: 'unknown'}[self.ac_line_status]
LPSYSTEM_POWER_STATUS = ctypes.POINTER(SYSTEM_POWER_STATUS)
GetSystemPowerStatus = ctypes.windll.kernel32.GetSystemPowerStatus
GetSystemPowerStatus.argtypes = LPSYSTEM_POWER_STATUS,
GetSystemPowerStatus.restype = ctypes.wintypes.BOOL
SetThreadExecutionState = ctypes.windll.kernel32.SetThreadExecutionState
SetThreadExecutionState.argtypes = [ctypes.c_uint]
SetThreadExecutionState.restype = ctypes.c_uint
class ES:
"""
Execution state constants
"""
continuous = 0x80000000
system_required = 1
display_required = 2
awaymode_required = 0x40

View file

@ -0,0 +1,106 @@
import ctypes.wintypes
class LUID(ctypes.Structure):
_fields_ = [
('low_part', ctypes.wintypes.DWORD),
('high_part', ctypes.wintypes.LONG),
]
def __eq__(self, other):
return (
self.high_part == other.high_part and
self.low_part == other.low_part
)
def __ne__(self, other):
return not (self==other)
LookupPrivilegeValue = ctypes.windll.advapi32.LookupPrivilegeValueW
LookupPrivilegeValue.argtypes = (
ctypes.wintypes.LPWSTR, # system name
ctypes.wintypes.LPWSTR, # name
ctypes.POINTER(LUID),
)
LookupPrivilegeValue.restype = ctypes.wintypes.BOOL
class TOKEN_INFORMATION_CLASS:
TokenUser = 1
TokenGroups = 2
TokenPrivileges = 3
# ... see http://msdn.microsoft.com/en-us/library/aa379626%28VS.85%29.aspx
SE_PRIVILEGE_ENABLED_BY_DEFAULT = 0x00000001
SE_PRIVILEGE_ENABLED = 0x00000002
SE_PRIVILEGE_REMOVED = 0x00000004
SE_PRIVILEGE_USED_FOR_ACCESS = 0x80000000
class LUID_AND_ATTRIBUTES(ctypes.Structure):
_fields_ = [
('LUID', LUID),
('attributes', ctypes.wintypes.DWORD),
]
def is_enabled(self):
return bool(self.attributes & SE_PRIVILEGE_ENABLED)
def enable(self):
self.attributes |= SE_PRIVILEGE_ENABLED
def get_name(self):
size = ctypes.wintypes.DWORD(10240)
buf = ctypes.create_unicode_buffer(size.value)
res = LookupPrivilegeName(None, self.LUID, buf, size)
if res == 0: raise RuntimeError
return buf[:size.value]
def __str__(self):
res = self.get_name()
if self.is_enabled(): res += ' (enabled)'
return res
LookupPrivilegeName = ctypes.windll.advapi32.LookupPrivilegeNameW
LookupPrivilegeName.argtypes = (
ctypes.wintypes.LPWSTR, # lpSystemName
ctypes.POINTER(LUID), # lpLuid
ctypes.wintypes.LPWSTR, # lpName
ctypes.POINTER(ctypes.wintypes.DWORD), # cchName
)
LookupPrivilegeName.restype = ctypes.wintypes.BOOL
class TOKEN_PRIVILEGES(ctypes.Structure):
_fields_ = [
('count', ctypes.wintypes.DWORD),
('privileges', LUID_AND_ATTRIBUTES*0),
]
def get_array(self):
array_type = LUID_AND_ATTRIBUTES*self.count
privileges = ctypes.cast(self.privileges, ctypes.POINTER(array_type)).contents
return privileges
def __iter__(self):
return iter(self.get_array())
PTOKEN_PRIVILEGES = ctypes.POINTER(TOKEN_PRIVILEGES)
GetTokenInformation = ctypes.windll.advapi32.GetTokenInformation
GetTokenInformation.argtypes = [
ctypes.wintypes.HANDLE, # TokenHandle
ctypes.c_uint, # TOKEN_INFORMATION_CLASS value
ctypes.c_void_p, # TokenInformation
ctypes.wintypes.DWORD, # TokenInformationLength
ctypes.POINTER(ctypes.wintypes.DWORD), # ReturnLength
]
GetTokenInformation.restype = ctypes.wintypes.BOOL
# http://msdn.microsoft.com/en-us/library/aa375202%28VS.85%29.aspx
AdjustTokenPrivileges = ctypes.windll.advapi32.AdjustTokenPrivileges
AdjustTokenPrivileges.restype = ctypes.wintypes.BOOL
AdjustTokenPrivileges.argtypes = [
ctypes.wintypes.HANDLE, # TokenHandle
ctypes.wintypes.BOOL, # DisableAllPrivileges
PTOKEN_PRIVILEGES, # NewState (optional)
ctypes.wintypes.DWORD, # BufferLength of PreviousState
PTOKEN_PRIVILEGES, # PreviousState (out, optional)
ctypes.POINTER(ctypes.wintypes.DWORD), # ReturnLength
]

View file

@ -0,0 +1,9 @@
import ctypes.wintypes
TOKEN_ALL_ACCESS = 0xf01ff
GetCurrentProcess = ctypes.windll.kernel32.GetCurrentProcess
GetCurrentProcess.restype = ctypes.wintypes.HANDLE
OpenProcessToken = ctypes.windll.advapi32.OpenProcessToken
OpenProcessToken.argtypes = (ctypes.wintypes.HANDLE, ctypes.wintypes.DWORD, ctypes.POINTER(ctypes.wintypes.HANDLE))
OpenProcessToken.restype = ctypes.wintypes.BOOL

View file

@ -0,0 +1,128 @@
import ctypes.wintypes
# from WinNT.h
READ_CONTROL = 0x00020000
STANDARD_RIGHTS_REQUIRED = 0x000F0000
STANDARD_RIGHTS_READ = READ_CONTROL
STANDARD_RIGHTS_WRITE = READ_CONTROL
STANDARD_RIGHTS_EXECUTE = READ_CONTROL
STANDARD_RIGHTS_ALL = 0x001F0000
# from NTSecAPI.h
POLICY_VIEW_LOCAL_INFORMATION = 0x00000001
POLICY_VIEW_AUDIT_INFORMATION = 0x00000002
POLICY_GET_PRIVATE_INFORMATION = 0x00000004
POLICY_TRUST_ADMIN = 0x00000008
POLICY_CREATE_ACCOUNT = 0x00000010
POLICY_CREATE_SECRET = 0x00000020
POLICY_CREATE_PRIVILEGE = 0x00000040
POLICY_SET_DEFAULT_QUOTA_LIMITS = 0x00000080
POLICY_SET_AUDIT_REQUIREMENTS = 0x00000100
POLICY_AUDIT_LOG_ADMIN = 0x00000200
POLICY_SERVER_ADMIN = 0x00000400
POLICY_LOOKUP_NAMES = 0x00000800
POLICY_NOTIFICATION = 0x00001000
POLICY_ALL_ACCESS = (
STANDARD_RIGHTS_REQUIRED |
POLICY_VIEW_LOCAL_INFORMATION |
POLICY_VIEW_AUDIT_INFORMATION |
POLICY_GET_PRIVATE_INFORMATION |
POLICY_TRUST_ADMIN |
POLICY_CREATE_ACCOUNT |
POLICY_CREATE_SECRET |
POLICY_CREATE_PRIVILEGE |
POLICY_SET_DEFAULT_QUOTA_LIMITS |
POLICY_SET_AUDIT_REQUIREMENTS |
POLICY_AUDIT_LOG_ADMIN |
POLICY_SERVER_ADMIN |
POLICY_LOOKUP_NAMES)
POLICY_READ = (
STANDARD_RIGHTS_READ |
POLICY_VIEW_AUDIT_INFORMATION |
POLICY_GET_PRIVATE_INFORMATION)
POLICY_WRITE = (
STANDARD_RIGHTS_WRITE |
POLICY_TRUST_ADMIN |
POLICY_CREATE_ACCOUNT |
POLICY_CREATE_SECRET |
POLICY_CREATE_PRIVILEGE |
POLICY_SET_DEFAULT_QUOTA_LIMITS |
POLICY_SET_AUDIT_REQUIREMENTS |
POLICY_AUDIT_LOG_ADMIN |
POLICY_SERVER_ADMIN)
POLICY_EXECUTE = (
STANDARD_RIGHTS_EXECUTE |
POLICY_VIEW_LOCAL_INFORMATION |
POLICY_LOOKUP_NAMES)
class TokenAccess:
TOKEN_QUERY = 0x8
class TokenInformationClass:
TokenUser = 1
class TOKEN_USER(ctypes.Structure):
num = 1
_fields_ = [
('SID', ctypes.c_void_p),
('ATTRIBUTES', ctypes.wintypes.DWORD),
]
class SECURITY_DESCRIPTOR(ctypes.Structure):
"""
typedef struct _SECURITY_DESCRIPTOR
{
UCHAR Revision;
UCHAR Sbz1;
SECURITY_DESCRIPTOR_CONTROL Control;
PSID Owner;
PSID Group;
PACL Sacl;
PACL Dacl;
} SECURITY_DESCRIPTOR;
"""
SECURITY_DESCRIPTOR_CONTROL = ctypes.wintypes.USHORT
REVISION = 1
_fields_ = [
('Revision', ctypes.c_ubyte),
('Sbz1', ctypes.c_ubyte),
('Control', SECURITY_DESCRIPTOR_CONTROL),
('Owner', ctypes.c_void_p),
('Group', ctypes.c_void_p),
('Sacl', ctypes.c_void_p),
('Dacl', ctypes.c_void_p),
]
class SECURITY_ATTRIBUTES(ctypes.Structure):
"""
typedef struct _SECURITY_ATTRIBUTES {
DWORD nLength;
LPVOID lpSecurityDescriptor;
BOOL bInheritHandle;
} SECURITY_ATTRIBUTES;
"""
_fields_ = [
('nLength', ctypes.wintypes.DWORD),
('lpSecurityDescriptor', ctypes.c_void_p),
('bInheritHandle', ctypes.wintypes.BOOL),
]
def __init__(self, *args, **kwargs):
super(SECURITY_ATTRIBUTES, self).__init__(*args, **kwargs)
self.nLength = ctypes.sizeof(SECURITY_ATTRIBUTES)
@property
def descriptor(self):
return self._descriptor
@descriptor.setter
def descriptor(self, value):
self._descriptor = value
self.lpSecurityDescriptor = ctypes.addressof(value)

View file

@ -0,0 +1,122 @@
import ctypes.wintypes
BOOL = ctypes.wintypes.BOOL
class SHELLSTATE(ctypes.Structure):
_fields_ = [
('show_all_objects', BOOL, 1),
('show_extensions', BOOL, 1),
('no_confirm_recycle', BOOL, 1),
('show_sys_files', BOOL, 1),
('show_comp_color', BOOL, 1),
('double_click_in_web_view', BOOL, 1),
('desktop_HTML', BOOL, 1),
('win95_classic', BOOL, 1),
('dont_pretty_path', BOOL, 1),
('show_attrib_col', BOOL, 1),
('map_network_drive_button', BOOL, 1),
('show_info_tip', BOOL, 1),
('hide_icons', BOOL, 1),
('web_view', BOOL, 1),
('filter', BOOL, 1),
('show_super_hidden', BOOL, 1),
('no_net_crawling', BOOL, 1),
('win95_unused', ctypes.wintypes.DWORD),
('param_sort', ctypes.wintypes.LONG),
('sort_direction', ctypes.c_int),
('version', ctypes.wintypes.UINT),
('not_used', ctypes.wintypes.UINT),
('sep_process', BOOL, 1),
('start_panel_on', BOOL, 1),
('show_start_page', BOOL, 1),
('auto_check_select', BOOL, 1),
('icons_only', BOOL, 1),
('show_type_overlay', BOOL, 1),
('spare_flags', ctypes.wintypes.UINT, 13),
]
SSF_SHOWALLOBJECTS = 0x00000001
"The fShowAllObjects member is being requested."
SSF_SHOWEXTENSIONS = 0x00000002
"The fShowExtensions member is being requested."
SSF_HIDDENFILEEXTS = 0x00000004
"Not used."
SSF_SERVERADMINUI = 0x00000004
"Not used."
SSF_SHOWCOMPCOLOR = 0x00000008
"The fShowCompColor member is being requested."
SSF_SORTCOLUMNS = 0x00000010
"The lParamSort and iSortDirection members are being requested."
SSF_SHOWSYSFILES = 0x00000020
"The fShowSysFiles member is being requested."
SSF_DOUBLECLICKINWEBVIEW = 0x00000080
"The fDoubleClickInWebView member is being requested."
SSF_SHOWATTRIBCOL = 0x00000100
"The fShowAttribCol member is being requested. (Windows Vista: Not used.)"
SSF_DESKTOPHTML = 0x00000200
"The fDesktopHTML member is being requested. Set is not available. Instead, for versions of Microsoft Windows prior to Windows XP, enable Desktop HTML by IActiveDesktop. The use of IActiveDesktop for this purpose, however, is not recommended for Windows XP and later versions of Windows, and is deprecated in Windows Vista."
SSF_WIN95CLASSIC = 0x00000400
"The fWin95Classic member is being requested."
SSF_DONTPRETTYPATH = 0x00000800
"The fDontPrettyPath member is being requested."
SSF_MAPNETDRVBUTTON = 0x00001000
"The fMapNetDrvBtn member is being requested."
SSF_SHOWINFOTIP = 0x00002000
"The fShowInfoTip member is being requested."
SSF_HIDEICONS = 0x00004000
"The fHideIcons member is being requested."
SSF_NOCONFIRMRECYCLE = 0x00008000
"The fNoConfirmRecycle member is being requested."
SSF_FILTER = 0x00010000
"The fFilter member is being requested. (Windows Vista: Not used.)"
SSF_WEBVIEW = 0x00020000
"The fWebView member is being requested."
SSF_SHOWSUPERHIDDEN = 0x00040000
"The fShowSuperHidden member is being requested."
SSF_SEPPROCESS = 0x00080000
"The fSepProcess member is being requested."
SSF_NONETCRAWLING = 0x00100000
"Windows XP and later. The fNoNetCrawling member is being requested."
SSF_STARTPANELON = 0x00200000
"Windows XP and later. The fStartPanelOn member is being requested."
SSF_SHOWSTARTPAGE = 0x00400000
"Not used."
SSF_AUTOCHECKSELECT = 0x00800000
"Windows Vista and later. The fAutoCheckSelect member is being requested."
SSF_ICONSONLY = 0x01000000
"Windows Vista and later. The fIconsOnly member is being requested."
SSF_SHOWTYPEOVERLAY = 0x02000000
"Windows Vista and later. The fShowTypeOverlay member is being requested."
SHGetSetSettings = ctypes.windll.shell32.SHGetSetSettings
SHGetSetSettings.argtypes = [
ctypes.POINTER(SHELLSTATE),
ctypes.wintypes.DWORD,
ctypes.wintypes.BOOL, # get or set (True: set)
]
SHGetSetSettings.restype = None

View file

@ -0,0 +1,14 @@
import ctypes.wintypes
SystemParametersInfo = ctypes.windll.user32.SystemParametersInfoW
SystemParametersInfo.argtypes = (
ctypes.wintypes.UINT,
ctypes.wintypes.UINT,
ctypes.c_void_p,
ctypes.wintypes.UINT,
)
SPI_GETACTIVEWINDOWTRACKING = 0x1000
SPI_SETACTIVEWINDOWTRACKING = 0x1001
SPI_GETACTIVEWNDTRKTIMEOUT = 0x2002
SPI_SETACTIVEWNDTRKTIMEOUT = 0x2003

View file

@ -0,0 +1,10 @@
import ctypes.wintypes
try:
from ctypes.wintypes import LPDWORD
except ImportError:
LPDWORD = ctypes.POINTER(ctypes.wintypes.DWORD)
GetUserName = ctypes.windll.advapi32.GetUserNameW
GetUserName.argtypes = ctypes.wintypes.LPWSTR, LPDWORD
GetUserName.restype = ctypes.wintypes.DWORD

View file

@ -0,0 +1,190 @@
from __future__ import with_statement, print_function
import sys
import re
import itertools
from contextlib import contextmanager
import io
import six
import ctypes
from ctypes import windll
from jaraco.windows.api import clipboard, memory
from jaraco.windows.error import handle_nonzero_success, WindowsError
from jaraco.windows.memory import LockedMemory
__all__ = (
'CF_TEXT', 'GetClipboardData', 'CloseClipboard',
'SetClipboardData', 'OpenClipboard',
)
def OpenClipboard(owner=None):
"""
Open the clipboard.
owner
[in] Handle to the window to be associated with the open clipboard.
If this parameter is None, the open clipboard is associated with the
current task.
"""
handle_nonzero_success(windll.user32.OpenClipboard(owner))
CloseClipboard = lambda: handle_nonzero_success(windll.user32.CloseClipboard())
data_handlers = dict()
def handles(*formats):
def register(func):
for format in formats:
data_handlers[format] = func
return func
return register
def nts(s):
"""
Null Terminated String
Get the portion of s up to a null character.
"""
result, null, rest = s.partition('\x00')
return result
@handles(clipboard.CF_DIBV5, clipboard.CF_DIB)
def raw_data(handle):
return LockedMemory(handle).data
@handles(clipboard.CF_TEXT)
def text_string(handle):
return nts(raw_data(handle))
@handles(clipboard.CF_UNICODETEXT)
def unicode_string(handle):
return nts(raw_data(handle).decode('utf-16'))
@handles(clipboard.CF_BITMAP)
def as_bitmap(handle):
# handle is HBITMAP
raise NotImplementedError("Can't convert to DIB")
# todo: use GetDIBits http://msdn.microsoft.com/en-us/library/dd144879%28v=VS.85%29.aspx
@handles(clipboard.CF_HTML)
class HTMLSnippet(object):
def __init__(self, handle):
self.data = text_string(handle)
self.headers = self.parse_headers(self.data)
@property
def html(self):
return self.data[self.headers['StartHTML']:]
@staticmethod
def parse_headers(data):
d = io.StringIO(data)
def header_line(line):
return re.match('(\w+):(.*)', line)
headers = itertools.imap(header_line, d)
# grab headers until they no longer match
headers = itertools.takewhile(bool, headers)
def best_type(value):
try:
return int(value)
except ValueError:
pass
try:
return float(value)
except ValueError:
pass
return value
pairs = (
(header.group(1), best_type(header.group(2)))
for header
in headers
)
return dict(pairs)
def GetClipboardData(type=clipboard.CF_UNICODETEXT):
if not type in data_handlers:
raise NotImplementedError("No support for data of type %d" % type)
handle = clipboard.GetClipboardData(type)
if handle is None:
raise TypeError("No clipboard data of type %d" % type)
return data_handlers[type](handle)
EmptyClipboard = lambda: handle_nonzero_success(windll.user32.EmptyClipboard())
def SetClipboardData(type, content):
"""
Modeled after http://msdn.microsoft.com/en-us/library/ms649016%28VS.85%29.aspx#_win32_Copying_Information_to_the_Clipboard
"""
allocators = {
clipboard.CF_TEXT: ctypes.create_string_buffer,
clipboard.CF_UNICODETEXT: ctypes.create_unicode_buffer,
}
if not type in allocators:
raise NotImplementedError("Only text types are supported at this time")
# allocate the memory for the data
content = allocators[type](content)
flags = memory.GMEM_MOVEABLE
size = ctypes.sizeof(content)
handle_to_copy = windll.kernel32.GlobalAlloc(flags, size)
with LockedMemory(handle_to_copy) as lm:
ctypes.memmove(lm.data_ptr, content, size)
result = clipboard.SetClipboardData(type, handle_to_copy)
if result is None:
raise WindowsError()
def set_text(source):
with context():
EmptyClipboard()
SetClipboardData(clipboard.CF_TEXT, source)
def get_text():
with context():
result = GetClipboardData(clipboard.CF_TEXT)
return result
def set_unicode_text(source):
with context():
EmptyClipboard()
SetClipboardData(clipboard.CF_UNICODETEXT, source)
def get_unicode_text():
with context():
return GetClipboardData()
def get_html():
with context():
result = GetClipboardData(clipboard.CF_HTML)
return result
def set_html(source):
with context():
EmptyClipboard()
SetClipboardData(clipboard.CF_UNICODETEXT, source)
def get_image():
with context():
return GetClipboardData(clipboard.CF_DIB)
def paste_stdout():
getter = get_unicode_text if six.PY3 else get_text
sys.stdout.write(getter())
def stdin_copy():
setter = set_unicode_text if six.PY3 else set_text
setter(sys.stdin.read())
@contextmanager
def context():
OpenClipboard()
try:
yield
finally:
CloseClipboard()
def get_formats():
with context():
format_index = 0
while True:
format_index = clipboard.EnumClipboardFormats(format_index)
if format_index == 0: break
yield format_index

View file

@ -0,0 +1,19 @@
import ctypes
import jaraco.windows.api.credential as api
from . import error
CRED_TYPE_GENERIC=1
def CredDelete(TargetName, Type, Flags=0):
error.handle_nonzero_success(api.CredDelete(TargetName, Type, Flags))
def CredRead(TargetName, Type, Flags=0):
cred_pointer = api.PCREDENTIAL()
res = api.CredRead(TargetName, Type, Flags, ctypes.byref(cred_pointer))
error.handle_nonzero_success(res)
return cred_pointer.contents
def CredWrite(Credential, Flags=0):
res = api.CredWrite(Credential, Flags)
error.handle_nonzero_success(res)

View file

@ -0,0 +1,146 @@
"""
Python routines to interface with the Microsoft
Data Protection API (DPAPI).
>>> orig_data = b'Ipsum Lorem...'
>>> ciphertext = CryptProtectData(orig_data)
>>> descr, data = CryptUnprotectData(ciphertext)
>>> data == orig_data
True
"""
import ctypes
from ctypes import wintypes
from jaraco.windows.error import handle_nonzero_success
class DATA_BLOB(ctypes.Structure):
r"""
A data blob structure for use with MS DPAPI functions.
Initialize with string of characters
>>> input = b'abc123\x00456'
>>> blob = DATA_BLOB(input)
>>> len(blob)
10
>>> blob.get_data() == input
True
"""
_fields_ = [
('data_size', wintypes.DWORD),
('data', ctypes.c_void_p),
]
def __init__(self, data=None):
super(DATA_BLOB, self).__init__()
self.set_data(data)
def set_data(self, data):
"Use this method to set the data for this blob"
if data is None:
self.data_size = 0
self.data = None
return
self.data_size = len(data)
# create a string buffer so that null bytes aren't interpreted
# as the end of the string
self.data = ctypes.cast(ctypes.create_string_buffer(data), ctypes.c_void_p)
def get_data(self):
"Get the data for this blob"
array = ctypes.POINTER(ctypes.c_char*len(self))
return ctypes.cast(self.data, array).contents.raw
def __len__(self):
return self.data_size
def __str__(self):
return self.get_data()
def free(self):
"""
"data out" blobs have locally-allocated memory.
Call this method to free the memory allocated by CryptProtectData
and CryptUnprotectData.
"""
ctypes.windll.kernel32.LocalFree(self.data)
p_DATA_BLOB = ctypes.POINTER(DATA_BLOB)
_CryptProtectData = ctypes.windll.crypt32.CryptProtectData
_CryptProtectData.argtypes = [
p_DATA_BLOB, # data in
wintypes.LPCWSTR, # data description
p_DATA_BLOB, # optional entropy
ctypes.c_void_p, # reserved
ctypes.c_void_p, # POINTER(CRYPTPROTECT_PROMPTSTRUCT), # prompt struct
wintypes.DWORD, # flags
p_DATA_BLOB, # data out
]
_CryptProtectData.restype = wintypes.BOOL
_CryptUnprotectData = ctypes.windll.crypt32.CryptUnprotectData
_CryptUnprotectData.argtypes = [
p_DATA_BLOB, # data in
ctypes.POINTER(wintypes.LPWSTR), # data description
p_DATA_BLOB, # optional entropy
ctypes.c_void_p, # reserved
ctypes.c_void_p, # POINTER(CRYPTPROTECT_PROMPTSTRUCT), # prompt struct
wintypes.DWORD, # flags
p_DATA_BLOB, # data out
]
_CryptUnprotectData.restype = wintypes.BOOL
CRYPTPROTECT_UI_FORBIDDEN = 0x01
def CryptProtectData(
data, description=None, optional_entropy=None,
prompt_struct=None, flags=0,
):
"""
Encrypt data
"""
data_in = DATA_BLOB(data)
entropy = DATA_BLOB(optional_entropy) if optional_entropy else None
data_out = DATA_BLOB()
res = _CryptProtectData(
data_in,
description,
entropy,
None, # reserved
prompt_struct,
flags,
data_out,
)
handle_nonzero_success(res)
res = data_out.get_data()
data_out.free()
return res
def CryptUnprotectData(data, optional_entropy=None, prompt_struct=None, flags=0):
"""
Returns a tuple of (description, data) where description is the
the description that was passed to the CryptProtectData call and
data is the decrypted result.
"""
data_in = DATA_BLOB(data)
entropy = DATA_BLOB(optional_entropy) if optional_entropy else None
data_out = DATA_BLOB()
ptr_description = wintypes.LPWSTR()
res = _CryptUnprotectData(
data_in,
ctypes.byref(ptr_description),
entropy,
None, # reserved
prompt_struct,
flags | CRYPTPROTECT_UI_FORBIDDEN,
data_out,
)
handle_nonzero_success(res)
description = ptr_description.value
if ptr_description.value is not None:
ctypes.windll.kernel32.LocalFree(ptr_description)
res = data_out.get_data()
data_out.free()
return description, res

View file

@ -0,0 +1,242 @@
#!/usr/bin/env python
from __future__ import absolute_import
import sys
import ctypes
import ctypes.wintypes
import six
winreg = six.moves.winreg
from jaraco.ui.editor import EditableFile
from jaraco.windows import error
from jaraco.windows.api import message, environ
from .registry import key_values as registry_key_values
def SetEnvironmentVariable(name, value):
error.handle_nonzero_success(environ.SetEnvironmentVariable(name, value))
def ClearEnvironmentVariable(name):
error.handle_nonzero_success(environ.SetEnvironmentVariable(name, None))
def GetEnvironmentVariable(name):
max_size = 2**15-1
buffer = ctypes.create_unicode_buffer(max_size)
error.handle_nonzero_success(environ.GetEnvironmentVariable(name, buffer, max_size))
return buffer.value
###
class RegisteredEnvironment(object):
"""
Manages the environment variables as set in the Windows Registry.
"""
@classmethod
def show(class_):
for name, value, type in registry_key_values(class_.key):
sys.stdout.write('='.join((name, value)) + '\n')
NoDefault = type('NoDefault', (object,), dict())
@classmethod
def get(class_, name, default=NoDefault):
try:
value, type = winreg.QueryValueEx(class_.key, name)
return value
except WindowsError:
if default is not class_.NoDefault:
return default
raise ValueError("No such key", name)
@classmethod
def get_values_list(class_, name, sep):
res = class_.get(name.upper(), [])
if isinstance(res, six.string_types):
res = res.split(sep)
return res
@classmethod
def set(class_, name, value, options):
# consider opening the key read-only except for here
# key = winreg.OpenKey(class_.key, None, 0, winreg.KEY_WRITE)
# and follow up by closing it.
if not value:
return class_.delete(name)
do_append = options.append or (
name.upper() in ('PATH', 'PATHEXT') and not options.replace
)
if do_append:
sep = ';'
values = class_.get_values_list(name, sep) + [value]
value = sep.join(values)
winreg.SetValueEx(class_.key, name, 0, winreg.REG_EXPAND_SZ, value)
class_.notify()
@classmethod
def add(class_, name, value, sep=';'):
"""
Add a value to a delimited variable, but only when the value isn't
already present.
"""
values = class_.get_values_list(name, sep)
if value in values:
return
new_value = sep.join(values + [value])
winreg.SetValueEx(class_.key, name, 0, winreg.REG_EXPAND_SZ,
new_value)
class_.notify()
@classmethod
def remove_values(class_, name, value_substring, options):
sep = ';'
values = class_.get_values_list(name, sep)
new_values = [
value
for value in values
if value_substring.lower() not in value.lower()
]
values = sep.join(new_values)
winreg.SetValueEx(class_.key, name, 0, winreg.REG_EXPAND_SZ, values)
class_.notify()
@classmethod
def edit(class_, name, value='', options=None):
# value, options ignored
sep = ';'
values = class_.get_values_list(name, sep)
e = EditableFile('\n'.join(values))
e.edit()
if e.changed:
values = sep.join(e.data.strip().split('\n'))
winreg.SetValueEx(class_.key, name, 0, winreg.REG_EXPAND_SZ, values)
class_.notify()
@classmethod
def delete(class_, name):
winreg.DeleteValue(class_.key, name)
class_.notify()
@classmethod
def notify(class_):
"""
Notify other windows that the environment has changed (following
http://support.microsoft.com/kb/104011).
"""
# TODO: Implement Microsoft UIPI (User Interface Privilege Isolation) to
# elevate privilege to system level so the system gets this notification
# for now, this must be run as admin to work as expected
return_val = ctypes.wintypes.DWORD()
res = message.SendMessageTimeout(
message.HWND_BROADCAST,
message.WM_SETTINGCHANGE,
0, # wparam must be null
'Environment',
message.SMTO_ABORTIFHUNG,
5000, # timeout in ms
return_val,
)
error.handle_nonzero_success(res)
class MachineRegisteredEnvironment(RegisteredEnvironment):
path = r'SYSTEM\CurrentControlSet\Control\Session Manager\Environment'
hklm = winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE)
try:
key = winreg.OpenKey(hklm, path, 0,
winreg.KEY_READ | winreg.KEY_WRITE)
except WindowsError:
key = winreg.OpenKey(hklm, path, 0, winreg.KEY_READ)
class UserRegisteredEnvironment(RegisteredEnvironment):
hkcu = winreg.ConnectRegistry(None, winreg.HKEY_CURRENT_USER)
key = winreg.OpenKey(hkcu, 'Environment', 0,
winreg.KEY_READ | winreg.KEY_WRITE)
def trim(s):
from textwrap import dedent
return dedent(s).strip()
def enver(*args):
"""
%prog [<name>=[value]]
To show all environment variables, call with no parameters:
%prog
To Add/Modify/Delete environment variable:
%prog <name>=[value]
If <name> is PATH or PATHEXT, %prog will by default append the value using
a semicolon as a separator. Use -r to disable this behavior or -a to force
it for variables other than PATH and PATHEXT.
If append is prescribed, but the value doesn't exist, the value will be
created.
If there is no value, %prog will delete the <name> environment variable.
i.e. "PATH="
To remove a specific value or values from a semicolon-separated
multi-value variable (such as PATH), use --remove-value.
e.g. enver --remove-value PATH=C:\\Unwanted\\Dir\\In\\Path
Remove-value matches case-insensitive and also matches any substring
so the following would also be sufficient to remove the aforementioned
undesirable dir.
enver --remove-value PATH=UNWANTED
Note that %prog does not affect the current running environment, and can
only affect subsequently spawned applications.
"""
from optparse import OptionParser
parser = OptionParser(usage=trim(enver.__doc__))
parser.add_option(
'-U', '--user-environment',
action='store_const', const=UserRegisteredEnvironment,
default=MachineRegisteredEnvironment,
dest='class_',
help="Use the current user's environment",
)
parser.add_option('-a', '--append',
action='store_true', default=False,
help="Append the value to any existing value (default for PATH and PATHEXT)",)
parser.add_option(
'-r', '--replace',
action='store_true', default=False,
help="Replace any existing value (used to override default append for PATH and PATHEXT)",
)
parser.add_option(
'--remove-value', action='store_true', default=False,
help="Remove any matching values from a semicolon-separated multi-value variable",
)
parser.add_option(
'-e', '--edit', action='store_true', default=False,
help="Edit the value in a local editor",
)
options, args = parser.parse_args(*args)
try:
param = args.pop()
if args:
parser.error("Too many parameters specified")
raise SystemExit(1)
if not '=' in param and not options.edit:
parser.error("Expected <name>= or <name>=<value>")
raise SystemExit(2)
name, sep, value = param.partition('=')
method_name = 'set'
if options.remove_value:
method_name = 'remove_values'
if options.edit:
method_name = 'edit'
method = getattr(options.class_, method_name)
method(name, value, options)
except IndexError:
options.class_.show()
if __name__ == '__main__':
enver()

View file

@ -0,0 +1,77 @@
#!/usr/bin/env python
import sys
import ctypes
import ctypes.wintypes
import six
builtins = six.moves.builtins
def format_system_message(errno):
"""
Call FormatMessage with a system error number to retrieve
the descriptive error message.
"""
# first some flags used by FormatMessageW
ALLOCATE_BUFFER = 0x100
FROM_SYSTEM = 0x1000
# Let FormatMessageW allocate the buffer (we'll free it below)
# Also, let it know we want a system error message.
flags = ALLOCATE_BUFFER | FROM_SYSTEM
source = None
message_id = errno
language_id = 0
result_buffer = ctypes.wintypes.LPWSTR()
buffer_size = 0
arguments = None
bytes = ctypes.windll.kernel32.FormatMessageW(
flags,
source,
message_id,
language_id,
ctypes.byref(result_buffer),
buffer_size,
arguments,
)
# note the following will cause an infinite loop if GetLastError
# repeatedly returns an error that cannot be formatted, although
# this should not happen.
handle_nonzero_success(bytes)
message = result_buffer.value
ctypes.windll.kernel32.LocalFree(result_buffer)
return message
class WindowsError(builtins.WindowsError):
"more info about errors at http://msdn.microsoft.com/en-us/library/ms681381(VS.85).aspx"
def __init__(self, value=None):
if value is None:
value = ctypes.windll.kernel32.GetLastError()
strerror = format_system_message(value)
if sys.version_info > (3,3):
args = 0, strerror, None, value
else:
args = value, strerror
super(WindowsError, self).__init__(*args)
@property
def message(self):
return self.strerror
@property
def code(self):
return self.winerror
def __str__(self):
return self.message
def __repr__(self):
return '{self.__class__.__name__}({self.winerror})'.format(**vars())
def handle_nonzero_success(result):
if result == 0:
raise WindowsError()

View file

@ -0,0 +1,50 @@
import functools
from six.moves import map
import win32api
import win32evtlog
import win32evtlogutil
error = win32api.error # The error the evtlog module raises.
class EventLog(object):
def __init__(self, name="Application", machine_name=None):
self.machine_name = machine_name
self.name = name
self.formatter = functools.partial(
win32evtlogutil.FormatMessage, logType=self.name)
def __enter__(self):
if hasattr(self, 'handle'):
raise ValueError("Overlapping attempts to use this log context")
self.handle = win32evtlog.OpenEventLog(self.machine_name, self.name)
return self
def __exit__(self, *args):
win32evtlog.CloseEventLog(self.handle)
del self.handle
_default_flags = (
win32evtlog.EVENTLOG_BACKWARDS_READ
| win32evtlog.EVENTLOG_SEQUENTIAL_READ
)
def get_records(self, flags=_default_flags):
with self:
while True:
objects = win32evtlog.ReadEventLog(self.handle, flags, 0)
if not objects:
break
for item in objects:
yield item
def __iter__(self):
return self.get_records()
def format_record(self, record):
return self.formatter(record)
def format_records(self, records=None):
if records is None:
records = self.get_records()
return map(self.format_record, records)

View file

@ -0,0 +1,390 @@
#!/usr/bin/env python
from __future__ import print_function
import os
import sys
import operator
import collections
import functools
from ctypes import (POINTER, byref, cast, create_unicode_buffer,
create_string_buffer, windll)
import six
from six.moves import builtins, filter, map
from jaraco.structures import binary
from jaraco.text import local_format as lf
from jaraco.windows.error import WindowsError, handle_nonzero_success
import jaraco.windows.api.filesystem as api
from jaraco.windows import reparse
def mklink():
"""
Like cmd.exe's mklink except it will infer directory status of the
target.
"""
from optparse import OptionParser
parser = OptionParser(usage="usage: %prog [options] link target")
parser.add_option('-d', '--directory',
help="Target is a directory (only necessary if not present)",
action="store_true")
options, args = parser.parse_args()
try:
link, target = args
except ValueError:
parser.error("incorrect number of arguments")
symlink(target, link, options.directory)
sys.stdout.write("Symbolic link created: %(link)s --> %(target)s\n" % vars())
def _is_target_a_directory(link, rel_target):
"""
If creating a symlink from link to a target, determine if target
is a directory (relative to dirname(link)).
"""
target = os.path.join(os.path.dirname(link), rel_target)
return os.path.isdir(target)
def symlink(target, link, target_is_directory = False):
"""
An implementation of os.symlink for Windows (Vista and greater)
"""
target_is_directory = (target_is_directory or
_is_target_a_directory(link, target))
# normalize the target (MS symlinks don't respect forward slashes)
target = os.path.normpath(target)
handle_nonzero_success(api.CreateSymbolicLink(link, target, target_is_directory))
def link(target, link):
"""
Establishes a hard link between an existing file and a new file.
"""
handle_nonzero_success(api.CreateHardLink(link, target, None))
def is_reparse_point(path):
"""
Determine if the given path is a reparse point.
Return False if the file does not exist or the file attributes cannot
be determined.
"""
res = api.GetFileAttributes(path)
return (
res != api.INVALID_FILE_ATTRIBUTES
and bool(res & api.FILE_ATTRIBUTE_REPARSE_POINT)
)
def islink(path):
"Determine if the given path is a symlink"
return is_reparse_point(path) and is_symlink(path)
def _patch_path(path):
"""
Paths have a max length of api.MAX_PATH characters (260). If a target path
is longer than that, it needs to be made absolute and prepended with
\\?\ in order to work with API calls.
See http://msdn.microsoft.com/en-us/library/aa365247%28v=vs.85%29.aspx for
details.
"""
if path.startswith('\\\\?\\'): return path
abs_path = os.path.abspath(path)
if not abs_path[1] == ':':
# python doesn't include the drive letter, but \\?\ requires it
abs_path = os.getcwd()[:2] + abs_path
return '\\\\?\\' + abs_path
def is_symlink(path):
"""
Assuming path is a reparse point, determine if it's a symlink.
"""
path = _patch_path(path)
try:
return _is_symlink(next(find_files(path)))
except WindowsError as orig_error:
tmpl = "Error accessing {path}: {orig_error.message}"
raise builtins.WindowsError(lf(tmpl))
def _is_symlink(find_data):
return find_data.reserved[0] == api.IO_REPARSE_TAG_SYMLINK
def find_files(spec):
"""
A pythonic wrapper around the FindFirstFile/FindNextFile win32 api.
>>> root_files = tuple(find_files(r'c:\*'))
>>> len(root_files) > 1
True
>>> root_files[0].filename == root_files[1].filename
False
This test might fail on a non-standard installation
>>> 'Windows' in (fd.filename for fd in root_files)
True
"""
fd = api.WIN32_FIND_DATA()
handle = api.FindFirstFile(spec, byref(fd))
while True:
if handle == api.INVALID_HANDLE_VALUE:
raise WindowsError()
yield fd
fd = api.WIN32_FIND_DATA()
res = api.FindNextFile(handle, byref(fd))
if res == 0: # error
error = WindowsError()
if error.code == api.ERROR_NO_MORE_FILES:
break
else: raise error
# todo: how to close handle when generator is destroyed?
# hint: catch GeneratorExit
windll.kernel32.FindClose(handle)
def get_final_path(path):
"""
For a given path, determine the ultimate location of that path.
Useful for resolving symlink targets.
This functions wraps the GetFinalPathNameByHandle from the Windows
SDK.
Note, this function fails if a handle cannot be obtained (such as
for C:\Pagefile.sys on a stock windows system). Consider using
trace_symlink_target instead.
"""
desired_access = api.NULL
share_mode = api.FILE_SHARE_READ | api.FILE_SHARE_WRITE | api.FILE_SHARE_DELETE
security_attributes = api.LPSECURITY_ATTRIBUTES() # NULL pointer
hFile = api.CreateFile(
path,
desired_access,
share_mode,
security_attributes,
api.OPEN_EXISTING,
api.FILE_FLAG_BACKUP_SEMANTICS,
api.NULL,
)
if hFile == api.INVALID_HANDLE_VALUE:
raise WindowsError()
buf_size = api.GetFinalPathNameByHandle(hFile, api.LPWSTR(), 0, api.VOLUME_NAME_DOS)
handle_nonzero_success(buf_size)
buf = create_unicode_buffer(buf_size)
result_length = api.GetFinalPathNameByHandle(hFile, buf, len(buf), api.VOLUME_NAME_DOS)
assert result_length < len(buf)
handle_nonzero_success(result_length)
handle_nonzero_success(api.CloseHandle(hFile))
return buf[:result_length]
def GetBinaryType(filepath):
res = api.DWORD()
handle_nonzero_success(api._GetBinaryType(filepath, res))
return res
def _make_null_terminated_list(obs):
obs = _makelist(obs)
if obs is None: return
return u'\x00'.join(obs) + u'\x00\x00'
def _makelist(ob):
if ob is None: return
if not isinstance(ob, (list, tuple, set)):
return [ob]
return ob
def SHFileOperation(operation, from_, to=None, flags=[]):
flags = functools.reduce(operator.or_, flags, 0)
from_ = _make_null_terminated_list(from_)
to = _make_null_terminated_list(to)
params = api.SHFILEOPSTRUCT(0, operation, from_, to, flags)
res = api._SHFileOperation(params)
if res != 0:
raise RuntimeError("SHFileOperation returned %d" % res)
def join(*paths):
r"""
Wrapper around os.path.join that works with Windows drive letters.
>>> join('d:\\foo', '\\bar')
'd:\\bar'
"""
paths_with_drives = map(os.path.splitdrive, paths)
drives, paths = zip(*paths_with_drives)
# the drive we care about is the last one in the list
drive = next(filter(None, reversed(drives)), '')
return os.path.join(drive, os.path.join(*paths))
def resolve_path(target, start=os.path.curdir):
r"""
Find a path from start to target where target is relative to start.
>>> orig_wd = os.getcwd()
>>> os.chdir('c:\\windows') # so we know what the working directory is
>>> findpath('d:\\')
'd:\\'
>>> findpath('d:\\', 'c:\\windows')
'd:\\'
>>> findpath('\\bar', 'd:\\')
'd:\\bar'
>>> findpath('\\bar', 'd:\\foo') # fails with '\\bar'
'd:\\bar'
>>> findpath('bar', 'd:\\foo')
'd:\\foo\\bar'
>>> findpath('\\baz', 'd:\\foo\\bar') # fails with '\\baz'
'd:\\baz'
>>> os.path.abspath(findpath('\\bar'))
'c:\\bar'
>>> os.path.abspath(findpath('bar'))
'c:\\windows\\bar'
>>> findpath('..', 'd:\\foo\\bar')
'd:\\foo'
The parent of the root directory is the root directory.
>>> findpath('..', 'd:\\')
'd:\\'
"""
return os.path.normpath(join(start, target))
findpath = resolve_path
def trace_symlink_target(link):
"""
Given a file that is known to be a symlink, trace it to its ultimate
target.
Raises TargetNotPresent when the target cannot be determined.
Raises ValueError when the specified link is not a symlink.
"""
if not is_symlink(link):
raise ValueError("link must point to a symlink on the system")
while is_symlink(link):
orig = os.path.dirname(link)
link = readlink(link)
link = resolve_path(link, orig)
return link
def readlink(link):
"""
readlink(link) -> target
Return a string representing the path to which the symbolic link points.
"""
handle = api.CreateFile(
link,
0,
0,
None,
api.OPEN_EXISTING,
api.FILE_FLAG_OPEN_REPARSE_POINT | api.FILE_FLAG_BACKUP_SEMANTICS,
None,
)
if handle == api.INVALID_HANDLE_VALUE:
raise WindowsError()
res = reparse.DeviceIoControl(handle, api.FSCTL_GET_REPARSE_POINT, None, 10240)
bytes = create_string_buffer(res)
p_rdb = cast(bytes, POINTER(api.REPARSE_DATA_BUFFER))
rdb = p_rdb.contents
if not rdb.tag == api.IO_REPARSE_TAG_SYMLINK:
raise RuntimeError("Expected IO_REPARSE_TAG_SYMLINK, but got %d" % rdb.tag)
handle_nonzero_success(api.CloseHandle(handle))
return rdb.get_substitute_name()
def patch_os_module():
"""
jaraco.windows provides the os.symlink and os.readlink functions.
Monkey-patch the os module to include them if not present.
"""
if not hasattr(os, 'symlink'):
os.symlink = symlink
os.path.islink = islink
if not hasattr(os, 'readlink'):
os.readlink = readlink
def find_symlinks(root):
for dirpath, dirnames, filenames in os.walk(root):
for name in dirnames + filenames:
pathname = os.path.join(dirpath, name)
if is_symlink(pathname):
yield pathname
# don't traverse symlinks
if name in dirnames:
dirnames.remove(name)
def find_symlinks_cmd():
"""
%prog [start-path]
Search the specified path (defaults to the current directory) for symlinks,
printing the source and target on each line.
"""
from optparse import OptionParser
from textwrap import dedent
parser = OptionParser(usage=dedent(find_symlinks_cmd.__doc__).strip())
options, args = parser.parse_args()
if not args: args = ['.']
root = args.pop()
if args:
parser.error("unexpected argument(s)")
try:
for symlink in find_symlinks(root):
target = readlink(symlink)
dir = ['', 'D'][os.path.isdir(symlink)]
msg = '{dir:2}{symlink} --> {target}'.format(**locals())
print(msg)
except KeyboardInterrupt:
pass
@six.add_metaclass(binary.BitMask)
class FileAttributes(int):
archive = 0x20
compressed = 0x800
hidden = 0x2
device = 0x40
directory = 0x10
encrypted = 0x4000
normal = 0x80
not_content_indexed = 0x2000
offline = 0x1000
read_only = 0x1
reparse_point = 0x400
sparse_file = 0x200
system = 0x4
temporary = 0x100
virtual = 0x10000
def GetFileAttributes(filepath):
attrs = api.GetFileAttributes(filepath)
if attrs == api.INVALID_FILE_ATTRIBUTES:
raise WindowsError()
return FileAttributes(attrs)
def SetFileAttributes(filepath, *attrs):
"""
Set file attributes. e.g.:
SetFileAttributes('C:\\foo', 'hidden')
Each attr must be either a numeric value, a constant defined in
jaraco.windows.filesystem.api, or one of the nice names
defined in this function.
"""
nice_names = collections.defaultdict(
lambda key: key,
hidden = 'FILE_ATTRIBUTE_HIDDEN',
read_only = 'FILE_ATTRIBUTE_READONLY',
)
flags = (getattr(api, nice_names[attr], attr) for attr in attrs)
flags = functools.reduce(operator.or_, flags)
handle_nonzero_success(api.SetFileAttributes(filepath, flags))

View file

@ -0,0 +1,255 @@
# -*- coding: UTF-8 -*-
"""
FileChange
Classes and routines for monitoring the file system for changes.
Copyright © 2004, 2011, 2013 Jason R. Coombs
"""
from __future__ import print_function
import os
import sys
import datetime
import re
from threading import Thread
import itertools
import logging
from more_itertools.recipes import consume
import jaraco.text
import jaraco.windows.api.filesystem as fs
from jaraco.windows.api import event
log = logging.getLogger(__name__)
class NotifierException(Exception):
pass
class FileFilter(object):
def set_root(self, root):
self.root = root
def _get_file_path(self, filename):
try:
filename = os.path.join(self.root, filename)
except AttributeError: pass
return filename
class ModifiedTimeFilter(FileFilter):
"""
Returns true for each call where the modified time of the file is after
the cutoff time.
"""
def __init__(self, cutoff):
self.cutoff = cutoff
def __call__(self, file):
filepath = self._get_file_path(file)
last_mod = datetime.datetime.utcfromtimestamp(
os.stat(filepath).st_mtime)
log.debug('{filepath} last modified at {last_mod}.'.format(**vars()))
return last_mod > self.cutoff
class PatternFilter(FileFilter):
"""
Filter that returns True for files that match pattern (a regular
expression).
"""
def __init__(self, pattern):
self.pattern = (
re.compile(pattern) if isinstance(pattern, basestring)
else pattern
)
def __call__(self, file):
return bool(self.pattern.match(file, re.I))
class GlobFilter(PatternFilter):
"""
Filter that returns True for files that match the pattern (a glob
expression.
"""
def __init__(self, expression):
super(GlobFilter, self).__init__(
self.convert_file_pattern(expression))
@staticmethod
def convert_file_pattern(p):
r"""
converts a filename specification (such as c:\*.*) to an equivelent
regular expression
>>> GlobFilter.convert_file_pattern('/*')
'/.*'
"""
subs = (('\\', '\\\\'), ('.', '\\.'), ('*', '.*'), ('?', '.'))
return jaraco.text.multi_substitution(*subs)(p)
class AggregateFilter(FileFilter):
"""
This file filter will aggregate the filters passed to it, and when called,
will return the results of each filter ANDed together.
"""
def __init__(self, *filters):
self.filters = filters
def set_root(self, root):
consume(f.set_root(root) for f in self.filters)
def __call__(self, file):
return all(fil(file) for fil in self.filters)
class OncePerModFilter(FileFilter):
def __init__(self):
self.history = list()
def __call__(self, file):
file = os.path.join(self.root, file)
key = file, os.stat(file).st_mtime
result = key not in self.history
self.history.append(key)
if len(self.history) > 100:
del self.history[-50:]
return result
def files_with_path(files, path):
return (os.path.join(path, file) for file in files)
def get_file_paths(walk_result):
root, dirs, files = walk_result
return files_with_path(files, root)
class Notifier(object):
def __init__(self, root = '.', filters = []):
# assign the root, verify it exists
self.root = root
if not os.path.isdir(self.root):
raise NotifierException(
'Root directory "%s" does not exist' % self.root)
self.filters = filters
self.watch_subtree = False
self.quit_event = event.CreateEvent(None, 0, 0, None)
self.opm_filter = OncePerModFilter()
def __del__(self):
try:
fs.FindCloseChangeNotification(self.hChange)
except: pass
def _get_change_handle(self):
# set up to monitor the directory tree specified
self.hChange = fs.FindFirstChangeNotification(
self.root,
self.watch_subtree,
fs.FILE_NOTIFY_CHANGE_LAST_WRITE,
)
# make sure it worked; if not, bail
INVALID_HANDLE_VALUE = fs.INVALID_HANDLE_VALUE
if self.hChange == INVALID_HANDLE_VALUE:
raise NotifierException('Could not set up directory change '
'notification')
@staticmethod
def _filtered_walk(path, file_filter):
"""
static method that calls os.walk, but filters out
anything that doesn't match the filter
"""
for root, dirs, files in os.walk(path):
log.debug('looking in %s', root)
log.debug('files is %s', files)
file_filter.set_root(root)
files = filter(file_filter, files)
log.debug('filtered files is %s', files)
yield (root, dirs, files)
def quit(self):
event.SetEvent(self.quit_event)
class BlockingNotifier(Notifier):
@staticmethod
def wait_results(*args):
""" calls WaitForMultipleObjects repeatedly with args """
return itertools.starmap(
event.WaitForMultipleObjects,
itertools.repeat(args))
def get_changed_files(self):
self._get_change_handle()
check_time = datetime.datetime.utcnow()
# block (sleep) until something changes in the
# target directory or a quit is requested.
# timeout so we can catch keyboard interrupts or other exceptions
events = (self.hChange, self.quit_event)
for result in self.wait_results(events, False, 1000):
if result == event.WAIT_TIMEOUT:
continue
index = result - event.WAIT_OBJECT_0
if events[index] is self.quit_event:
# quit was received; stop yielding results
return
# something has changed.
log.debug('Change notification received')
fs.FindNextChangeNotification(self.hChange)
next_check_time = datetime.datetime.utcnow()
log.debug('Looking for all files changed after %s', check_time)
for file in self.find_files_after(check_time):
yield file
check_time = next_check_time
def find_files_after(self, cutoff):
mtf = ModifiedTimeFilter(cutoff)
af = AggregateFilter(mtf, self.opm_filter, *self.filters)
results = Notifier._filtered_walk(self.root, af)
results = itertools.imap(get_file_paths, results)
if self.watch_subtree:
result = itertools.chain(*results)
else:
result = next(results)
return result
class ThreadedNotifier(BlockingNotifier, Thread):
r"""
ThreadedNotifier provides a simple interface that calls the handler
for each file rooted in root that passes the filters. It runs as its own
thread, so must be started as such.
>>> notifier = ThreadedNotifier('c:\\', handler = StreamHandler()) # doctest: +SKIP
>>> notifier.start() # doctest: +SKIP
C:\Autoexec.bat changed.
"""
def __init__(self, root = '.', filters = [], handler = lambda file: None):
# init notifier stuff
BlockingNotifier.__init__(self, root, filters)
# init thread stuff
Thread.__init__(self)
# set it as a daemon thread so that it doesn't block waiting to close.
# I tried setting __del__(self) to .quit(), but unfortunately, there
# are references to this object in the win32api stuff, so __del__
# never gets called.
self.setDaemon(True)
self.handle = handler
def run(self):
for file in self.get_changed_files():
self.handle(file)
class StreamHandler(object):
"""
StreamHandler: a sample handler object for use with the threaded
notifier that will announce by writing to the supplied stream
(stdout by default) the name of the file.
"""
def __init__(self, output = sys.stdout):
self.output = output
def __call__(self, filename):
self.output.write('%s changed.\n' % filename)

119
libs/jaraco/windows/inet.py Normal file
View file

@ -0,0 +1,119 @@
"""
Some routines for retrieving the addresses from the local
network config.
"""
from __future__ import print_function
import itertools
import ctypes
from jaraco.windows.api import errors, inet
def GetAdaptersAddresses():
size = ctypes.c_ulong()
res = inet.GetAdaptersAddresses(0,0,None, None,size)
if res != errors.ERROR_BUFFER_OVERFLOW:
raise RuntimeError("Error getting structure length (%d)" % res)
print(size.value)
pointer_type = ctypes.POINTER(inet.IP_ADAPTER_ADDRESSES)
buffer = ctypes.create_string_buffer(size.value)
struct_p = ctypes.cast(buffer, pointer_type)
res = inet.GetAdaptersAddresses(0,0,None, struct_p, size)
if res != errors.NO_ERROR:
raise RuntimeError("Error retrieving table (%d)" % res)
while struct_p:
yield struct_p.contents
struct_p = struct_p.contents.next
class AllocatedTable(object):
"""
Both the interface table and the ip address table use the same
technique to store arrays of structures of variable length. This
base class captures the functionality to retrieve and access those
table entries.
The subclass needs to define three class attributes:
method: a callable that takes three arguments - a pointer to
the structure, the length of the data contained by the
structure, and a boolean of whether the result should
be sorted.
structure: a C structure defininition that describes the table
format.
row_structure: a C structure definition that describes the row
format.
"""
def __get_table_size(self):
"""
Retrieve the size of the buffer needed by calling the method
with a null pointer and length of zero. This should trigger an
insufficient buffer error and return the size needed for the
buffer.
"""
length = ctypes.wintypes.DWORD()
res = self.method(None, length, False)
if res != errors.ERROR_INSUFFICIENT_BUFFER:
raise RuntimeError("Error getting table length (%d)" % res)
return length.value
def get_table(self):
"""
Get the table
"""
buffer_length = self.__get_table_size()
returned_buffer_length = ctypes.wintypes.DWORD(buffer_length)
buffer = ctypes.create_string_buffer(buffer_length)
pointer_type = ctypes.POINTER(self.structure)
table_p = ctypes.cast(buffer, pointer_type)
res = self.method(table_p, returned_buffer_length, False)
if res != errors.NO_ERROR:
raise RuntimeError("Error retrieving table (%d)" % res)
return table_p.contents
@property
def entries(self):
"""
Using the table structure, return the array of entries based
on the table size.
"""
table = self.get_table()
entries_array = self.row_structure*table.num_entries
pointer_type = ctypes.POINTER(entries_array)
return ctypes.cast(table.entries, pointer_type).contents
class InterfaceTable(AllocatedTable):
method = inet.GetIfTable
structure = inet.MIB_IFTABLE
row_structure = inet.MIB_IFROW
class AddressTable(AllocatedTable):
method = inet.GetIpAddrTable
structure = inet.MIB_IPADDRTABLE
row_structure = inet.MIB_IPADDRROW
class AddressManager(object):
@staticmethod
def hardware_address_to_string(addr):
hex_bytes = (byte.encode('hex') for byte in addr)
return ':'.join(hex_bytes)
def get_host_mac_address_strings(self):
return (self.hardware_address_to_string(addr)
for addr in self.get_host_mac_addresses())
def get_host_ip_address_strings(self):
return itertools.imap(str, self.get_host_ip_addresses())
def get_host_mac_addresses(self):
return (
entry.physical_address
for entry in InterfaceTable().entries
)
def get_host_ip_addresses(self):
return (
entry.address
for entry in AddressTable().entries
)

View file

@ -0,0 +1,20 @@
import ctypes
from .api import library
def find_lib(lib):
r"""
Find the DLL for a given library.
Accepts a string or loaded module
>>> print(find_lib('kernel32').lower())
c:\windows\system32\kernel32.dll
"""
if isinstance(lib, str):
lib = getattr(ctypes.windll, lib)
size = 1024
result = ctypes.create_unicode_buffer(size)
library.GetModuleFileName(lib._handle, result, size)
return result.value

View file

@ -0,0 +1,28 @@
import ctypes
from ctypes import WinError
from .api import memory
class LockedMemory(object):
def __init__(self, handle):
self.handle = handle
def __enter__(self):
self.data_ptr = memory.GlobalLock(self.handle)
if not self.data_ptr:
del self.data_ptr
raise WinError()
return self
def __exit__(self, *args):
memory.GlobalUnlock(self.handle)
del self.data_ptr
@property
def data(self):
with self:
return ctypes.string_at(self.data_ptr, self.size)
@property
def size(self):
return memory.GlobalSize(self.data_ptr)

View file

@ -0,0 +1,62 @@
import ctypes.wintypes
import six
from .error import handle_nonzero_success
from .api import memory
class MemoryMap(object):
"""
A memory map object which can have security attributes overridden.
"""
def __init__(self, name, length, security_attributes=None):
self.name = name
self.length = length
self.security_attributes = security_attributes
self.pos = 0
def __enter__(self):
p_SA = (
ctypes.byref(self.security_attributes)
if self.security_attributes else None
)
INVALID_HANDLE_VALUE = -1
PAGE_READWRITE = 0x4
FILE_MAP_WRITE = 0x2
filemap = ctypes.windll.kernel32.CreateFileMappingW(
INVALID_HANDLE_VALUE, p_SA, PAGE_READWRITE, 0, self.length,
six.text_type(self.name))
handle_nonzero_success(filemap)
if filemap == INVALID_HANDLE_VALUE:
raise Exception("Failed to create file mapping")
self.filemap = filemap
self.view = memory.MapViewOfFile(filemap, FILE_MAP_WRITE, 0, 0, 0)
return self
def seek(self, pos):
self.pos = pos
def write(self, msg):
assert isinstance(msg, bytes)
n = len(msg)
if self.pos + n >= self.length: # A little safety.
raise ValueError("Refusing to write %d bytes" % n)
dest = self.view + self.pos
length = ctypes.c_size_t(n)
ctypes.windll.kernel32.RtlMoveMemory(dest, msg, length)
self.pos += n
def read(self, n):
"""
Read n bytes from mapped view.
"""
out = ctypes.create_string_buffer(n)
source = self.view + self.pos
length = ctypes.c_size_t(n)
ctypes.windll.kernel32.RtlMoveMemory(out, source, length)
self.pos += n
return out.raw
def __exit__(self, exc_type, exc_val, tb):
ctypes.windll.kernel32.UnmapViewOfFile(self.view)
ctypes.windll.kernel32.CloseHandle(self.filemap)

View file

@ -0,0 +1,54 @@
# -*- coding: UTF-8 -*-
"""cookies.py
Cookie support utilities
"""
import os
import itertools
import six
class CookieMonster(object):
"Read cookies out of a user's IE cookies file"
@property
def cookie_dir(self):
import _winreg as winreg
key = winreg.OpenKeyEx(winreg.HKEY_CURRENT_USER, 'Software'
'\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders')
cookie_dir, type = winreg.QueryValueEx(key, 'Cookies')
return cookie_dir
def entries(self, filename):
with open(os.path.join(self.cookie_dir, filename)) as cookie_file:
while True:
entry = itertools.takewhile(self.is_not_cookie_delimiter,
cookie_file)
entry = list(map(six.text_type.rstrip, entry))
if not entry: break
cookie = self.make_cookie(*entry)
yield cookie
@staticmethod
def is_not_cookie_delimiter(s):
return s != '*\n'
@staticmethod
def make_cookie(key, value, domain, flags, ExpireLow, ExpireHigh,
CreateLow, CreateHigh):
expires = (int(ExpireHigh) << 32) | int(ExpireLow)
created = (int(CreateHigh) << 32) | int(CreateLow)
flags = int(flags)
domain, sep, path = domain.partition('/')
path = '/' + path
return dict(
key=key,
value=value,
domain=domain,
flags=flags,
expires=expires,
created=created,
path=path,
)

View file

@ -0,0 +1,28 @@
"""
API hooks for network stuff.
"""
__all__ = ('AddConnection')
from jaraco.windows.error import WindowsError
from .api import net
def AddConnection(remote_name, type=net.RESOURCETYPE_ANY, local_name=None,
provider_name=None, user=None, password=None, flags=0):
resource = net.NETRESOURCE(
type=type,
remote_name=remote_name,
local_name=local_name,
provider_name=provider_name,
# WNetAddConnection2 ignores the other members of NETRESOURCE
)
result = net.WNetAddConnection2(
resource,
password,
user,
flags,
)
if result != 0:
raise WindowsError(result)

View file

@ -0,0 +1,70 @@
#-*- coding: utf-8 -*-
from __future__ import print_function
import itertools
import contextlib
import ctypes
from more_itertools.recipes import consume, unique_justseen
try:
import wmi as wmilib
except ImportError:
pass
from jaraco.windows.error import handle_nonzero_success
from .api import power
def GetSystemPowerStatus():
stat = power.SYSTEM_POWER_STATUS()
handle_nonzero_success(GetSystemPowerStatus(stat))
return stat
def _init_power_watcher():
global power_watcher
if 'power_watcher' not in globals():
wmi = wmilib.WMI()
query = 'SELECT * from Win32_PowerManagementEvent'
power_watcher = wmi.ExecNotificationQuery(query)
def get_power_management_events():
_init_power_watcher()
while True:
yield power_watcher.NextEvent()
def wait_for_power_status_change():
EVT_POWER_STATUS_CHANGE = 10
def not_power_status_change(evt):
return evt.EventType != EVT_POWER_STATUS_CHANGE
events = get_power_management_events()
consume(itertools.takewhile(not_power_status_change, events))
def get_unique_power_states():
"""
Just like get_power_states, but ensures values are returned only
when the state changes.
"""
return unique_justseen(get_power_states())
def get_power_states():
"""
Continuously return the power state of the system when it changes.
This function will block indefinitely if the power state never
changes.
"""
while True:
state = GetSystemPowerStatus()
yield state.ac_line_status_string
wait_for_power_status_change()
@contextlib.contextmanager
def no_sleep():
"""
Context that prevents the computer from going to sleep.
"""
mode = power.ES.continuous | power.ES.system_required
handle_nonzero_success(power.SetThreadExecutionState(mode))
try:
yield
finally:
handle_nonzero_success(power.SetThreadExecutionState(power.ES.continuous))

View file

@ -0,0 +1,122 @@
from __future__ import print_function
import ctypes
from ctypes import wintypes
from .api import security
from .api import privilege
from .api import process
def get_process_token():
"""
Get the current process token
"""
token = wintypes.HANDLE()
res = process.OpenProcessToken(process.GetCurrentProcess(), process.TOKEN_ALL_ACCESS, token)
if not res > 0: raise RuntimeError("Couldn't get process token")
return token
def get_symlink_luid():
"""
Get the LUID for the SeCreateSymbolicLinkPrivilege
"""
symlink_luid = privilege.LUID()
res = privilege.LookupPrivilegeValue(None, "SeCreateSymbolicLinkPrivilege", symlink_luid)
if not res > 0: raise RuntimeError("Couldn't lookup privilege value")
return symlink_luid
def get_privilege_information():
"""
Get all privileges associated with the current process.
"""
# first call with zero length to determine what size buffer we need
return_length = wintypes.DWORD()
params = [
get_process_token(),
privilege.TOKEN_INFORMATION_CLASS.TokenPrivileges,
None,
0,
return_length,
]
res = privilege.GetTokenInformation(*params)
# assume we now have the necessary length in return_length
buffer = ctypes.create_string_buffer(return_length.value)
params[2] = buffer
params[3] = return_length.value
res = privilege.GetTokenInformation(*params)
assert res > 0, "Error in second GetTokenInformation (%d)" % res
privileges = ctypes.cast(buffer, ctypes.POINTER(privilege.TOKEN_PRIVILEGES)).contents
return privileges
def report_privilege_information():
"""
Report all privilege information assigned to the current process.
"""
privileges = get_privilege_information()
print("found {0} privileges".format(privileges.count))
tuple(map(print, privileges))
def enable_symlink_privilege():
"""
Try to assign the symlink privilege to the current process token.
Return True if the assignment is successful.
"""
# create a space in memory for a TOKEN_PRIVILEGES structure
# with one element
size = ctypes.sizeof(privilege.TOKEN_PRIVILEGES)
size += ctypes.sizeof(privilege.LUID_AND_ATTRIBUTES)
buffer = ctypes.create_string_buffer(size)
tp = ctypes.cast(buffer, ctypes.POINTER(privilege.TOKEN_PRIVILEGES)).contents
tp.count = 1
tp.get_array()[0].enable()
tp.get_array()[0].LUID = get_symlink_luid()
token = get_process_token()
res = privilege.AdjustTokenPrivileges(token, False, tp, 0, None, None)
if res == 0:
raise RuntimeError("Error in AdjustTokenPrivileges")
ERROR_NOT_ALL_ASSIGNED = 1300
return ctypes.windll.kernel32.GetLastError() != ERROR_NOT_ALL_ASSIGNED
class PolicyHandle(wintypes.HANDLE):
pass
class LSA_UNICODE_STRING(ctypes.Structure):
_fields_ = [
('length', ctypes.c_ushort),
('max_length', ctypes.c_ushort),
('buffer', ctypes.wintypes.LPWSTR),
]
def OpenPolicy(system_name, object_attributes, access_mask):
policy = PolicyHandle()
raise NotImplementedError("Need to construct structures for parameters "
"(see http://msdn.microsoft.com/en-us/library/windows/desktop/aa378299%28v=vs.85%29.aspx)")
res = ctypes.windll.advapi32.LsaOpenPolicy(system_name, object_attributes,
access_mask, ctypes.byref(policy))
assert res == 0, "Error status {res}".format(**vars())
return policy
def grant_symlink_privilege(who, machine=''):
"""
Grant the 'create symlink' privilege to who.
Based on http://support.microsoft.com/kb/132958
"""
flags = security.POLICY_CREATE_ACCOUNT | security.POLICY_LOOKUP_NAMES
policy = OpenPolicy(machine, flags)
return policy
def main():
assigned = enable_symlink_privilege()
msg = ['failure', 'success'][assigned]
print("Symlink privilege assignment completed with {0}".format(msg))
if __name__ == '__main__': main()

View file

@ -0,0 +1,18 @@
from itertools import count
import six
winreg = six.moves.winreg
def key_values(key):
for index in count():
try:
yield winreg.EnumValue(key, index)
except WindowsError:
break
def key_subkeys(key):
for index in count():
try:
yield winreg.EnumKey(key, index)
except WindowsError:
break

View file

@ -0,0 +1,33 @@
from __future__ import division
import ctypes.wintypes
from .error import handle_nonzero_success
from .api import filesystem
def DeviceIoControl(device, io_control_code, in_buffer, out_buffer, overlapped=None):
if overlapped is not None:
raise NotImplementedError("overlapped handles not yet supported")
if isinstance(out_buffer, int):
out_buffer = ctypes.create_string_buffer(out_buffer)
in_buffer_size = len(in_buffer) if in_buffer is not None else 0
out_buffer_size = len(out_buffer)
assert isinstance(out_buffer, ctypes.Array)
returned_bytes = ctypes.wintypes.DWORD()
res = filesystem.DeviceIoControl(
device,
io_control_code,
in_buffer, in_buffer_size,
out_buffer, out_buffer_size,
returned_bytes,
overlapped,
)
handle_nonzero_success(res)
handle_nonzero_success(returned_bytes)
return out_buffer[:returned_bytes.value]

View file

@ -0,0 +1,58 @@
import ctypes.wintypes
from jaraco.windows.error import handle_nonzero_success
from .api import security
def GetTokenInformation(token, information_class):
"""
Given a token, get the token information for it.
"""
data_size = ctypes.wintypes.DWORD()
ctypes.windll.advapi32.GetTokenInformation(token, information_class.num,
0, 0, ctypes.byref(data_size))
data = ctypes.create_string_buffer(data_size.value)
handle_nonzero_success(ctypes.windll.advapi32.GetTokenInformation(token,
information_class.num,
ctypes.byref(data), ctypes.sizeof(data),
ctypes.byref(data_size)))
return ctypes.cast(data, ctypes.POINTER(security.TOKEN_USER)).contents
def OpenProcessToken(proc_handle, access):
result = ctypes.wintypes.HANDLE()
proc_handle = ctypes.wintypes.HANDLE(proc_handle)
handle_nonzero_success(ctypes.windll.advapi32.OpenProcessToken(
proc_handle, access, ctypes.byref(result)))
return result
def get_current_user():
"""
Return a TOKEN_USER for the owner of this process.
"""
process = OpenProcessToken(
ctypes.windll.kernel32.GetCurrentProcess(),
security.TokenAccess.TOKEN_QUERY,
)
return GetTokenInformation(process, security.TOKEN_USER)
def get_security_attributes_for_user(user=None):
"""
Return a SECURITY_ATTRIBUTES structure with the SID set to the
specified user (uses current user if none is specified).
"""
if user is None:
user = get_current_user()
assert isinstance(user, security.TOKEN_USER), "user must be TOKEN_USER instance"
SD = security.SECURITY_DESCRIPTOR()
SA = security.SECURITY_ATTRIBUTES()
# by attaching the actual security descriptor, it will be garbage-
# collected with the security attributes
SA.descriptor = SD
SA.bInheritHandle = 1
ctypes.windll.advapi32.InitializeSecurityDescriptor(ctypes.byref(SD),
security.SECURITY_DESCRIPTOR.REVISION)
ctypes.windll.advapi32.SetSecurityDescriptorOwner(ctypes.byref(SD),
user.SID, 0)
return SA

View file

@ -0,0 +1,227 @@
"""
Windows Services support for controlling Windows Services.
Based on http://code.activestate.com/recipes/115875-controlling-windows-services/
"""
from __future__ import print_function
import sys
import time
import win32api
import win32con
import win32service
class Service(object):
"""
The Service Class is used for controlling Windows
services. Just pass the name of the service you wish to control to the
class instance and go from there. For example, if you want to control
the Workstation service try this:
from jaraco.windows import services
workstation = services.Service("Workstation")
workstation.start()
workstation.fetchstatus("running", 10)
workstation.stop()
workstation.fetchstatus("stopped")
Creating an instance of the Service class is done by passing the name of
the service as it appears in the Management Console or the short name as
it appears in the registry. Mixed case is ok.
cvs = services.Service("CVS NT Service 1.11.1.2 (Build 41)")
or
cvs = services.Service("cvs")
If needing remote service control try this:
cvs = services.Service("cvs", r"\\CVS_SERVER")
or
cvs = services.Service("cvs", "\\\\CVS_SERVER")
The Service Class supports these methods:
start: Starts service.
stop: Stops service.
restart: Stops and restarts service.
pause: Pauses service (Only if service supports feature).
resume: Resumes service that has been paused.
status: Queries current status of service.
fetchstatus: Continually queries service until requested status(STARTING, RUNNING,
STOPPING & STOPPED) is met or timeout value(in seconds) reached.
Default timeout value is infinite.
infotype: Queries service for process type. (Single, shared and/or
interactive process)
infoctrl: Queries control information about a running service.
i.e. Can it be paused, stopped, etc?
infostartup: Queries service Startup type. (Boot, System,
Automatic, Manual, Disabled)
setstartup Changes/sets Startup type. (Boot, System,
Automatic, Manual, Disabled)
getname: Gets the long and short service names used by Windowin32service.
(Generally used for internal purposes)
"""
def __init__(self, service, machinename=None, dbname=None):
self.userv = service
self.scmhandle = win32service.OpenSCManager(machinename, dbname, win32service.SC_MANAGER_ALL_ACCESS)
self.sserv, self.lserv = self.getname()
if (self.sserv or self.lserv) is None:
sys.exit()
self.handle = win32service.OpenService(self.scmhandle, self.sserv, win32service.SERVICE_ALL_ACCESS)
self.sccss = "SYSTEM\\CurrentControlSet\\Services\\"
def start(self):
win32service.StartService(self.handle, None)
def stop(self):
self.stat = win32service.ControlService(self.handle, win32service.SERVICE_CONTROL_STOP)
def restart(self):
self.stop()
self.fetchstatus("STOPPED")
self.start()
def pause(self):
self.stat = win32service.ControlService(self.handle, win32service.SERVICE_CONTROL_PAUSE)
def resume(self):
self.stat = win32service.ControlService(self.handle, win32service.SERVICE_CONTROL_CONTINUE)
def status(self, prn = 0):
self.stat = win32service.QueryServiceStatus(self.handle)
if self.stat[1]==win32service.SERVICE_STOPPED:
if prn == 1:
print("The %s service is stopped." % self.lserv)
else:
return "STOPPED"
elif self.stat[1]==win32service.SERVICE_START_PENDING:
if prn == 1:
print("The %s service is starting." % self.lserv)
else:
return "STARTING"
elif self.stat[1]==win32service.SERVICE_STOP_PENDING:
if prn == 1:
print("The %s service is stopping." % self.lserv)
else:
return "STOPPING"
elif self.stat[1]==win32service.SERVICE_RUNNING:
if prn == 1:
print("The %s service is running." % self.lserv)
else:
return "RUNNING"
def fetchstatus(self, fstatus, timeout=None):
self.fstatus = fstatus.upper()
if timeout is not None:
timeout = int(timeout)
timeout *= 2
def to(timeout):
time.sleep(.5)
if timeout is not None:
if timeout > 1:
timeout -= 1
return timeout
else:
return "TO"
if self.fstatus == "STOPPED":
while 1:
self.stat = win32service.QueryServiceStatus(self.handle)
if self.stat[1]==win32service.SERVICE_STOPPED:
self.fstate = "STOPPED"
break
else:
timeout=to(timeout)
if timeout == "TO":
return "TIMEDOUT"
break
elif self.fstatus == "STOPPING":
while 1:
self.stat = win32service.QueryServiceStatus(self.handle)
if self.stat[1]==win32service.SERVICE_STOP_PENDING:
self.fstate = "STOPPING"
break
else:
timeout=to(timeout)
if timeout == "TO":
return "TIMEDOUT"
break
elif self.fstatus == "RUNNING":
while 1:
self.stat = win32service.QueryServiceStatus(self.handle)
if self.stat[1]==win32service.SERVICE_RUNNING:
self.fstate = "RUNNING"
break
else:
timeout=to(timeout)
if timeout == "TO":
return "TIMEDOUT"
break
elif self.fstatus == "STARTING":
while 1:
self.stat = win32service.QueryServiceStatus(self.handle)
if self.stat[1]==win32service.SERVICE_START_PENDING:
self.fstate = "STARTING"
break
else:
timeout=to(timeout)
if timeout == "TO":
return "TIMEDOUT"
break
def infotype(self):
self.stat = win32service.QueryServiceStatus(self.handle)
if self.stat[0] and win32service.SERVICE_WIN32_OWN_PROCESS:
print("The %s service runs in its own process." % self.lserv)
if self.stat[0] and win32service.SERVICE_WIN32_SHARE_PROCESS:
print("The %s service shares a process with other services." % self.lserv)
if self.stat[0] and win32service.SERVICE_INTERACTIVE_PROCESS:
print("The %s service can interact with the desktop." % self.lserv)
def infoctrl(self):
self.stat = win32service.QueryServiceStatus(self.handle)
if self.stat[2] and win32service.SERVICE_ACCEPT_PAUSE_CONTINUE:
print("The %s service can be paused." % self.lserv)
if self.stat[2] and win32service.SERVICE_ACCEPT_STOP:
print("The %s service can be stopped." % self.lserv)
if self.stat[2] and win32service.SERVICE_ACCEPT_SHUTDOWN:
print("The %s service can be shutdown." % self.lserv)
def infostartup(self):
self.isuphandle = win32api.RegOpenKeyEx(win32con.HKEY_LOCAL_MACHINE, self.sccss + self.sserv, 0, win32con.KEY_READ)
self.isuptype = win32api.RegQueryValueEx(self.isuphandle, "Start")[0]
win32api.RegCloseKey(self.isuphandle)
if self.isuptype == 0:
return "boot"
elif self.isuptype == 1:
return "system"
elif self.isuptype == 2:
return "automatic"
elif self.isuptype == 3:
return "manual"
elif self.isuptype == 4:
return "disabled"
@property
def suptype(self):
types = 'boot', 'system', 'automatic', 'manual', 'disabled'
lookup = dict((name, number) for number, name in enumerate(types))
return lookup[self.startuptype]
def setstartup(self, startuptype):
self.startuptype = startuptype.lower()
self.snc = win32service.SERVICE_NO_CHANGE
win32service.ChangeServiceConfig(self.handle, self.snc, self.suptype,
self.snc, None, None, 0, None, None, None, self.lserv)
def getname(self):
self.snames=win32service.EnumServicesStatus(self.scmhandle)
for i in self.snames:
if i[0].lower() == self.userv.lower():
return i[0], i[1]
break
if i[1].lower() == self.userv.lower():
return i[0], i[1]
break
print("Error: The %s service doesn't seem to exist." % self.userv)
return None, None

View file

@ -0,0 +1,12 @@
from .api import shell
def get_recycle_bin_confirm():
settings = shell.SHELLSTATE()
shell.SHGetSetSettings(settings, shell.SSF_NOCONFIRMRECYCLE, False)
return not settings.no_confirm_recycle
def set_recycle_bin_confirm(confirm=False):
settings = shell.SHELLSTATE()
settings.no_confirm_recycle = not confirm
shell.SHGetSetSettings(settings, shell.SSF_NOCONFIRMRECYCLE, True)
# cross fingers and hope it worked

View file

@ -0,0 +1,70 @@
# -*- coding: UTF-8 -*-
"""
timers
In particular, contains a waitable timer.
"""
from __future__ import absolute_import
import time
from six.moves import _thread
from jaraco.windows.api import event as win32event
__author__ = 'Jason R. Coombs <jaraco@jaraco.com>'
class WaitableTimer:
"""
t = WaitableTimer()
t.set(None, 10) # every 10 seconds
t.wait_for_signal() # 10 seconds elapses
t.stop()
t.wait_for_signal(20) # 20 seconds elapses (timeout occurred)
"""
def __init__(self):
self.signal_event = win32event.CreateEvent(None, 0, 0, None)
self.stop_event = win32event.CreateEvent(None, 0, 0, None)
def set(self, due_time, period):
_thread.start_new_thread(self._signal_loop, (due_time, period))
def stop(self):
win32event.SetEvent(self.stop_event)
def wait_for_signal(self, timeout = None):
"""
wait for the signal; return after the signal has occurred or the
timeout in seconds elapses.
"""
timeout_ms = int(timeout*1000) if timeout else win32event.INFINITE
win32event.WaitForSingleObject(self.signal_event, timeout_ms)
def _signal_loop(self, due_time, period):
if not due_time and not period:
raise ValueError("due_time or period must be non-zero")
try:
if not due_time:
due_time = time.time() + period
if due_time:
self._wait(due_time - time.time())
while period:
due_time += period
self._wait(due_time - time.time())
except Exception:
pass
#we're done here, just quit
def _wait(self, seconds):
milliseconds = int(seconds*1000)
if milliseconds > 0:
res = win32event.WaitForSingleObject(self.stop_event, milliseconds)
if res == win32event.WAIT_OBJECT_0: raise Exception
if res == win32event.WAIT_TIMEOUT: pass
win32event.SetEvent(self.signal_event)
@staticmethod
def get_even_due_time(period):
now = time.time()
return now - (now % period)

View file

@ -0,0 +1,242 @@
#!/usr/bin/env python
import operator
import ctypes
import datetime
from ctypes.wintypes import WORD, WCHAR, BOOL, LONG
from jaraco.windows.util import Extended
from jaraco.collections import RangeMap
class AnyDict(object):
"A dictionary that returns the same value regardless of key"
def __init__(self, value):
self.value = value
def __getitem__(self, key):
return self.value
class SYSTEMTIME(Extended, ctypes.Structure):
_fields_ = [
('year', WORD),
('month', WORD),
('day_of_week', WORD),
('day', WORD),
('hour', WORD),
('minute', WORD),
('second', WORD),
('millisecond', WORD),
]
class REG_TZI_FORMAT(Extended, ctypes.Structure):
_fields_ = [
('bias', LONG),
('standard_bias', LONG),
('daylight_bias', LONG),
('standard_start', SYSTEMTIME),
('daylight_start', SYSTEMTIME),
]
class TIME_ZONE_INFORMATION(Extended, ctypes.Structure):
_fields_ = [
('bias', LONG),
('standard_name', WCHAR*32),
('standard_start', SYSTEMTIME),
('standard_bias', LONG),
('daylight_name', WCHAR*32),
('daylight_start', SYSTEMTIME),
('daylight_bias', LONG),
]
class DYNAMIC_TIME_ZONE_INFORMATION(TIME_ZONE_INFORMATION):
"""
Because the structure of the DYNAMIC_TIME_ZONE_INFORMATION extends
the structure of the TIME_ZONE_INFORMATION, this structure
can be used as a drop-in replacement for calls where the
structure is passed by reference.
For example,
dynamic_tzi = DYNAMIC_TIME_ZONE_INFORMATION()
ctypes.windll.kernel32.GetTimeZoneInformation(ctypes.byref(dynamic_tzi))
(although the key_name and dynamic_daylight_time_disabled flags will be
set to the default (null)).
>>> isinstance(DYNAMIC_TIME_ZONE_INFORMATION(), TIME_ZONE_INFORMATION)
True
"""
_fields_ = [
# ctypes automatically includes the fields from the parent
('key_name', WCHAR*128),
('dynamic_daylight_time_disabled', BOOL),
]
def __init__(self, *args, **kwargs):
"""Allow initialization from args from both this class and
its superclass. Default ctypes implementation seems to
assume that this class is only initialized with its own
_fields_ (for non-keyword-args)."""
super_self = super(DYNAMIC_TIME_ZONE_INFORMATION, self)
super_fields = super_self._fields_
super_args = args[:len(super_fields)]
self_args = args[len(super_fields):]
# convert the super args to keyword args so they're also handled
for field, arg in zip(super_fields, super_args):
field_name, spec = field
kwargs[field_name] = arg
super(DYNAMIC_TIME_ZONE_INFORMATION, self).__init__(*self_args, **kwargs)
class Info(DYNAMIC_TIME_ZONE_INFORMATION):
"""
A time zone definition class based on the win32
DYNAMIC_TIME_ZONE_INFORMATION structure.
Describes a bias against UTC (bias), and two dates at which a separate
additional bias applies (standard_bias and daylight_bias).
"""
def field_names(self):
return map(operator.itemgetter(0), self._fields_)
def __init__(self, *args, **kwargs):
"""
Try to construct a timezone.Info from
a) [DYNAMIC_]TIME_ZONE_INFORMATION args
b) another Info
c) a REG_TZI_FORMAT
d) a byte structure
"""
funcs = (
super(Info, self).__init__,
self.__init_from_other,
self.__init_from_reg_tzi,
self.__init_from_bytes,
)
for func in funcs:
try:
func(*args, **kwargs)
return
except TypeError:
pass
raise TypeError("Invalid arguments for %s" % self.__class__)
def __init_from_bytes(self, bytes, **kwargs):
reg_tzi = REG_TZI_FORMAT()
# todo: use buffer API in Python 3
buffer = buffer(bytes)
ctypes.memmove(ctypes.addressof(reg_tzi), buffer, len(buffer))
self.__init_from_reg_tzi(self, reg_tzi, **kwargs)
def __init_from_reg_tzi(self, reg_tzi, **kwargs):
if not isinstance(reg_tzi, REG_TZI_FORMAT):
raise TypeError("Not a REG_TZI_FORMAT")
for field_name, type in reg_tzi._fields_:
setattr(self, field_name, getattr(reg_tzi, field_name))
for name, value in kwargs.items():
setattr(self, name, value)
def __init_from_other(self, other):
if not isinstance(other, TIME_ZONE_INFORMATION):
raise TypeError("Not a TIME_ZONE_INFORMATION")
for name in other.field_names():
# explicitly get the value from the underlying structure
value = super(Info, other).__getattribute__(other, name)
setattr(self, name, value)
# consider instead of the loop above just copying the memory directly
#size = max(ctypes.sizeof(DYNAMIC_TIME_ZONE_INFO), ctypes.sizeof(other))
#ctypes.memmove(ctypes.addressof(self), other, size)
def __getattribute__(self, attr):
value = super(Info, self).__getattribute__(attr)
make_minute_timedelta = lambda m: datetime.timedelta(minutes = m)
if 'bias' in attr:
value = make_minute_timedelta(value)
return value
@classmethod
def current(class_):
"Windows Platform SDK GetTimeZoneInformation"
tzi = class_()
kernel32 = ctypes.windll.kernel32
getter = kernel32.GetTimeZoneInformation
getter = getattr(kernel32, 'GetDynamicTimeZoneInformation', getter)
code = getter(ctypes.byref(tzi))
return code, tzi
def set(self):
kernel32 = ctypes.windll.kernel32
setter = kernel32.SetTimeZoneInformation
setter = getattr(kernel32, 'SetDynamicTimeZoneInformation', setter)
return setter(ctypes.byref(self))
def copy(self):
return self.__class__(self)
def locate_daylight_start(self, year):
info = self.get_info_for_year(year)
return self._locate_day(year, info.daylight_start)
def locate_standard_start(self, year):
info = self.get_info_for_year(year)
return self._locate_day(year, info.standard_start)
def get_info_for_year(self, year):
return self.dynamic_info[year]
@property
def dynamic_info(self):
"Return a map that for a given year will return the correct Info"
if self.key_name:
dyn_key = self.get_key().subkey('Dynamic DST')
del dyn_key['FirstEntry']
del dyn_key['LastEntry']
years = map(int, dyn_key.keys())
values = map(Info, dyn_key.values())
# create a range mapping that searches by descending year and matches
# if the target year is greater or equal.
return RangeMap(zip(years, values), RangeMap.descending, operator.ge)
else:
return AnyDict(self)
@staticmethod
def _locate_day(year, cutoff):
"""
Takes a SYSTEMTIME object, such as retrieved from a TIME_ZONE_INFORMATION
structure or call to GetTimeZoneInformation and interprets it based on the given
year to identify the actual day.
This method is necessary because the SYSTEMTIME structure refers to a day by its
day of the week and week of the month (e.g. 4th saturday in March).
>>> SATURDAY = 6
>>> MARCH = 3
>>> st = SYSTEMTIME(2000, MARCH, SATURDAY, 4, 0, 0, 0, 0)
# according to my calendar, the 4th Saturday in March in 2009 was the 28th
>>> expected_date = datetime.datetime(2009, 3, 28)
>>> Info._locate_day(2009, st) == expected_date
True
"""
# MS stores Sunday as 0, Python datetime stores Monday as zero
target_weekday = (cutoff.day_of_week + 6) % 7
# For SYSTEMTIMEs relating to time zone inforamtion, cutoff.day
# is the week of the month
week_of_month = cutoff.day
# so the following is the first day of that week
day = (week_of_month - 1) * 7 + 1
result = datetime.datetime(year, cutoff.month, day,
cutoff.hour, cutoff.minute, cutoff.second, cutoff.millisecond)
# now the result is the correct week, but not necessarily the correct day of the week
days_to_go = (target_weekday - result.weekday()) % 7
result += datetime.timedelta(days_to_go)
# if we selected a day in the month following the target month,
# move back a week or two.
# This is necessary because Microsoft defines the fifth week in a month
# to be the last week in a month and adding the time delta might have
# pushed the result into the next month.
while result.month == cutoff.month + 1:
result -= datetime.timedelta(weeks = 1)
return result

View file

@ -0,0 +1,8 @@
#!/usr/bin/env python
import ctypes
from jaraco.windows.util import ensure_unicode
def MessageBox(text, caption=None, handle=None, type=None):
text, caption = map(ensure_unicode, (text, caption))
ctypes.windll.user32.MessageBoxW(handle, text, caption, type)

View file

@ -0,0 +1,15 @@
import ctypes
from .api import errors
from .api.user import GetUserName
from .error import WindowsError, handle_nonzero_success
def get_user_name():
size = ctypes.wintypes.DWORD()
try:
handle_nonzero_success(GetUserName(None, size))
except WindowsError as e:
if e.code != errors.ERROR_INSUFFICIENT_BUFFER:
raise
buffer = ctypes.create_unicode_buffer(size.value)
handle_nonzero_success(GetUserName(buffer, size))
return buffer.value

View file

@ -0,0 +1,18 @@
#!/usr/bin/env python
import ctypes
def ensure_unicode(param):
try:
param = ctypes.create_unicode_buffer(param)
except TypeError:
pass # just return the param as is
return param
class Extended(object):
"Used to add extended capability to structures"
def __eq__(self, other):
return buffer(self) == buffer(other)
def __ne__(self, other):
return buffer(self) != buffer(other)

View file

@ -0,0 +1,14 @@
import os
from path import path
def install_pptp(name, param_lines):
"""
"""
# or consider using the API: http://msdn.microsoft.com/en-us/library/aa446739%28v=VS.85%29.aspx
pbk_path = (path(os.environ['PROGRAMDATA'])
/ 'Microsoft' / 'Network' / 'Connections' / 'pbk' / 'rasphone.pbk')
pbk_path.dirname().makedirs_p()
with open(pbk_path, 'a') as pbk:
pbk.write('[{name}]\n'.format(name=name))
pbk.writelines(param_lines)
pbk.write('\n')

View file

@ -0,0 +1,97 @@
#!python
from __future__ import print_function
import ctypes
from jaraco.windows.error import handle_nonzero_success
from jaraco.windows.api import system
from jaraco.ui.cmdline import Command
def set(value):
result = system.SystemParametersInfo(
system.SPI_SETACTIVEWINDOWTRACKING,
0,
ctypes.cast(value, ctypes.c_void_p),
0,
)
handle_nonzero_success(result)
def get():
value = ctypes.wintypes.BOOL()
result = system.SystemParametersInfo(
system.SPI_GETACTIVEWINDOWTRACKING,
0,
ctypes.byref(value),
0,
)
handle_nonzero_success(result)
return bool(value)
def set_delay(milliseconds):
result = system.SystemParametersInfo(
system.SPI_SETACTIVEWNDTRKTIMEOUT,
0,
ctypes.cast(milliseconds, ctypes.c_void_p),
0,
)
handle_nonzero_success(result)
def get_delay():
value = ctypes.wintypes.DWORD()
result = system.SystemParametersInfo(
system.SPI_GETACTIVEWNDTRKTIMEOUT,
0,
ctypes.byref(value),
0,
)
handle_nonzero_success(result)
return int(value.value)
class DelayParam(Command):
@staticmethod
def add_arguments(parser):
parser.add_argument(
'-d', '--delay', type=int,
help="Delay in milliseconds for active window tracking"
)
class Show(Command):
@classmethod
def run(cls, args):
msg = "xmouse: {enabled} (delay {delay}ms)".format(
enabled=get(),
delay=get_delay(),
)
print(msg)
class Enable(DelayParam):
@classmethod
def run(cls, args):
print("enabling xmouse")
set(True)
args.delay and set_delay(args.delay)
class Disable(DelayParam):
@classmethod
def run(cls, args):
print("disabling xmouse")
set(False)
args.delay and set_delay(args.delay)
class Toggle(DelayParam):
@classmethod
def run(cls, args):
value = get()
print("xmouse: %s -> %s" % (value, not value))
set(not value)
args.delay and set_delay(args.delay)
if __name__ == '__main__':
Command.invoke()