Bump cherrypy from 18.8.0 to 18.9.0 (#2266)

* Bump cherrypy from 18.8.0 to 18.9.0

Bumps [cherrypy](https://github.com/cherrypy/cherrypy) from 18.8.0 to 18.9.0.
- [Changelog](https://github.com/cherrypy/cherrypy/blob/main/CHANGES.rst)
- [Commits](https://github.com/cherrypy/cherrypy/compare/v18.8.0...v18.9.0)

---
updated-dependencies:
- dependency-name: cherrypy
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Update cherrypy==18.9.0

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com>

[skip ci]
This commit is contained in:
dependabot[bot] 2024-03-24 15:25:44 -07:00 committed by GitHub
commit faef9a94c4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
673 changed files with 159850 additions and 11583 deletions

View file

@ -0,0 +1,63 @@
# This is an example of a service hosted by python.exe rather than
# pythonservice.exe.
# Note that it is very rare that using python.exe is a better option
# than the default pythonservice.exe - the latter has better error handling
# so that if Python itself can't be initialized or there are very early
# import errors, you will get error details written to the event log. When
# using python.exe instead, you are forced to wait for the interpreter startup
# and imports to succeed before you are able to effectively setup your own
# error handling.
# So in short, please make sure you *really* want to do this, otherwise just
# stick with the default.
import os
import sys
import servicemanager
import win32serviceutil
from pipeTestService import TestPipeService
class NativeTestPipeService(TestPipeService):
_svc_name_ = "PyNativePipeTestService"
_svc_display_name_ = "Python Native Pipe Test Service"
_svc_description_ = "Tests Python.exe hosted services"
# tell win32serviceutil we have a custom executable and custom args
# so registration does the right thing.
_exe_name_ = sys.executable
_exe_args_ = '"' + os.path.abspath(sys.argv[0]) + '"'
def main():
if len(sys.argv) == 1:
# service must be starting...
print("service is starting...")
print("(execute this script with '--help' if that isn't what you want)")
# for the sake of debugging etc, we use win32traceutil to see
# any unhandled exceptions and print statements.
import win32traceutil
print("service is still starting...")
servicemanager.Initialize()
servicemanager.PrepareToHostSingle(NativeTestPipeService)
# Now ask the service manager to fire things up for us...
servicemanager.StartServiceCtrlDispatcher()
print("service done!")
else:
win32serviceutil.HandleCommandLine(NativeTestPipeService)
if __name__ == "__main__":
try:
main()
except (SystemExit, KeyboardInterrupt):
raise
except:
print("Something went bad!")
import traceback
traceback.print_exc()

View file

@ -0,0 +1,184 @@
# A Demo of services and named pipes.
# A multi-threaded service that simply echos back its input.
# * Install as a service using "pipeTestService.py install"
# * Use Control Panel to change the user name of the service
# to a real user name (ie, NOT the SystemAccount)
# * Start the service.
# * Run the "pipeTestServiceClient.py" program as the client pipe side.
import _thread
import traceback
# Old versions of the service framework would not let you import this
# module at the top-level. Now you can, and can check 'Debugging()' and
# 'RunningAsService()' to check your context.
import pywintypes
import servicemanager
import win32con
import win32service
import win32serviceutil
import winerror
from ntsecuritycon import *
from win32api import *
# Use "import *" to keep this looking as much as a "normal" service
# as possible. Real code shouldn't do this.
from win32event import *
from win32file import *
from win32pipe import *
def ApplyIgnoreError(fn, args):
try:
return fn(*args)
except error: # Ignore win32api errors.
return None
class TestPipeService(win32serviceutil.ServiceFramework):
_svc_name_ = "PyPipeTestService"
_svc_display_name_ = "Python Pipe Test Service"
_svc_description_ = "Tests Python service framework by receiving and echoing messages over a named pipe"
def __init__(self, args):
win32serviceutil.ServiceFramework.__init__(self, args)
self.hWaitStop = CreateEvent(None, 0, 0, None)
self.overlapped = pywintypes.OVERLAPPED()
self.overlapped.hEvent = CreateEvent(None, 0, 0, None)
self.thread_handles = []
def CreatePipeSecurityObject(self):
# Create a security object giving World read/write access,
# but only "Owner" modify access.
sa = pywintypes.SECURITY_ATTRIBUTES()
sidEveryone = pywintypes.SID()
sidEveryone.Initialize(SECURITY_WORLD_SID_AUTHORITY, 1)
sidEveryone.SetSubAuthority(0, SECURITY_WORLD_RID)
sidCreator = pywintypes.SID()
sidCreator.Initialize(SECURITY_CREATOR_SID_AUTHORITY, 1)
sidCreator.SetSubAuthority(0, SECURITY_CREATOR_OWNER_RID)
acl = pywintypes.ACL()
acl.AddAccessAllowedAce(FILE_GENERIC_READ | FILE_GENERIC_WRITE, sidEveryone)
acl.AddAccessAllowedAce(FILE_ALL_ACCESS, sidCreator)
sa.SetSecurityDescriptorDacl(1, acl, 0)
return sa
# The functions executed in their own thread to process a client request.
def DoProcessClient(self, pipeHandle, tid):
try:
try:
# Create a loop, reading large data. If we knew the data stream was
# was small, a simple ReadFile would do.
d = "".encode("ascii") # ensure bytes on py2k and py3k...
hr = winerror.ERROR_MORE_DATA
while hr == winerror.ERROR_MORE_DATA:
hr, thisd = ReadFile(pipeHandle, 256)
d = d + thisd
print("Read", d)
ok = 1
except error:
# Client disconnection - do nothing
ok = 0
# A secure service would handle (and ignore!) errors writing to the
# pipe, but for the sake of this demo we dont (if only to see what errors
# we can get when our clients break at strange times :-)
if ok:
msg = (
"%s (on thread %d) sent me %s"
% (GetNamedPipeHandleState(pipeHandle, False, True)[4], tid, d)
).encode("ascii")
WriteFile(pipeHandle, msg)
finally:
ApplyIgnoreError(DisconnectNamedPipe, (pipeHandle,))
ApplyIgnoreError(CloseHandle, (pipeHandle,))
def ProcessClient(self, pipeHandle):
try:
procHandle = GetCurrentProcess()
th = DuplicateHandle(
procHandle,
GetCurrentThread(),
procHandle,
0,
0,
win32con.DUPLICATE_SAME_ACCESS,
)
try:
self.thread_handles.append(th)
try:
return self.DoProcessClient(pipeHandle, th)
except:
traceback.print_exc()
finally:
self.thread_handles.remove(th)
except:
traceback.print_exc()
def SvcStop(self):
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
SetEvent(self.hWaitStop)
def SvcDoRun(self):
# Write an event log record - in debug mode we will also
# see this message printed.
servicemanager.LogMsg(
servicemanager.EVENTLOG_INFORMATION_TYPE,
servicemanager.PYS_SERVICE_STARTED,
(self._svc_name_, ""),
)
num_connections = 0
while 1:
pipeHandle = CreateNamedPipe(
"\\\\.\\pipe\\PyPipeTest",
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_MESSAGE | PIPE_READMODE_BYTE,
PIPE_UNLIMITED_INSTANCES, # max instances
0,
0,
6000,
self.CreatePipeSecurityObject(),
)
try:
hr = ConnectNamedPipe(pipeHandle, self.overlapped)
except error as details:
print("Error connecting pipe!", details)
CloseHandle(pipeHandle)
break
if hr == winerror.ERROR_PIPE_CONNECTED:
# Client is already connected - signal event
SetEvent(self.overlapped.hEvent)
rc = WaitForMultipleObjects(
(self.hWaitStop, self.overlapped.hEvent), 0, INFINITE
)
if rc == WAIT_OBJECT_0:
# Stop event
break
else:
# Pipe event - spawn thread to deal with it.
_thread.start_new_thread(self.ProcessClient, (pipeHandle,))
num_connections = num_connections + 1
# Sleep to ensure that any new threads are in the list, and then
# wait for all current threads to finish.
# What is a better way?
Sleep(500)
while self.thread_handles:
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING, 5000)
print("Waiting for %d threads to finish..." % (len(self.thread_handles)))
WaitForMultipleObjects(self.thread_handles, 1, 3000)
# Write another event log record.
servicemanager.LogMsg(
servicemanager.EVENTLOG_INFORMATION_TYPE,
servicemanager.PYS_SERVICE_STOPPED,
(self._svc_name_, " after processing %d connections" % (num_connections,)),
)
if __name__ == "__main__":
win32serviceutil.HandleCommandLine(TestPipeService)

View file

@ -0,0 +1,156 @@
# A Test Program for pipeTestService.py
#
# Install and start the Pipe Test service, then run this test
# either from the same machine, or from another using the "-s" param.
#
# Eg: pipeTestServiceClient.py -s server_name Hi There
# Should work.
import os
import sys
import traceback
import pywintypes
import win32api
import winerror
from win32event import *
from win32file import *
from win32pipe import *
verbose = 0
# def ReadFromPipe(pipeName):
# Could (Should?) use CallNamedPipe, but this technique allows variable size
# messages (whereas you must supply a buffer size for CallNamedPipe!
# hPipe = CreateFile(pipeName, GENERIC_WRITE, 0, None, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, 0)
# more = 1
# while more:
# hr = ReadFile(hPipe, 256)
# if hr==0:
# more = 0
# except win32api.error (hr, fn, desc):
# if hr==winerror.ERROR_MORE_DATA:
# data = dat
#
def CallPipe(fn, args):
ret = None
retryCount = 0
while retryCount < 8: # Keep looping until user cancels.
retryCount = retryCount + 1
try:
return fn(*args)
except win32api.error as exc:
if exc.winerror == winerror.ERROR_PIPE_BUSY:
win32api.Sleep(5000)
continue
else:
raise
raise RuntimeError("Could not make a connection to the server")
def testClient(server, msg):
if verbose:
print("Sending", msg)
data = CallPipe(
CallNamedPipe,
("\\\\%s\\pipe\\PyPipeTest" % server, msg, 256, NMPWAIT_WAIT_FOREVER),
)
if verbose:
print("Server sent back '%s'" % data)
print("Sent and received a message!")
def testLargeMessage(server, size=4096):
if verbose:
print("Sending message of size %d" % (size))
msg = "*" * size
data = CallPipe(
CallNamedPipe,
("\\\\%s\\pipe\\PyPipeTest" % server, msg, 512, NMPWAIT_WAIT_FOREVER),
)
if len(data) - size:
print("Sizes are all wrong - send %d, got back %d" % (size, len(data)))
def stressThread(server, numMessages, wait):
try:
try:
for i in range(numMessages):
r = CallPipe(
CallNamedPipe,
(
"\\\\%s\\pipe\\PyPipeTest" % server,
"#" * 512,
1024,
NMPWAIT_WAIT_FOREVER,
),
)
except:
traceback.print_exc()
print("Failed after %d messages" % i)
finally:
SetEvent(wait)
def stressTestClient(server, numThreads, numMessages):
import _thread
thread_waits = []
for t_num in range(numThreads):
# Note I could just wait on thread handles (after calling DuplicateHandle)
# See the service itself for an example of waiting for the clients...
wait = CreateEvent(None, 0, 0, None)
thread_waits.append(wait)
_thread.start_new_thread(stressThread, (server, numMessages, wait))
# Wait for all threads to finish.
WaitForMultipleObjects(thread_waits, 1, INFINITE)
def main():
import getopt
import sys
server = "."
thread_count = 0
msg_count = 500
try:
opts, args = getopt.getopt(sys.argv[1:], "s:t:m:vl")
for o, a in opts:
if o == "-s":
server = a
if o == "-m":
msg_count = int(a)
if o == "-t":
thread_count = int(a)
if o == "-v":
global verbose
verbose = 1
if o == "-l":
testLargeMessage(server)
msg = " ".join(args).encode("mbcs")
except getopt.error as msg:
print(msg)
my_name = os.path.split(sys.argv[0])[1]
print(
"Usage: %s [-v] [-s server] [-t thread_count=0] [-m msg_count=500] msg ..."
% my_name
)
print(" -v = verbose")
print(
" Specifying a value for -t will stress test using that many threads."
)
return
testClient(server, msg)
if thread_count > 0:
print(
"Spawning %d threads each sending %d messages..."
% (thread_count, msg_count)
)
stressTestClient(server, thread_count, msg_count)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,98 @@
# A Demo of a service that takes advantage of the additional notifications
# available in later Windows versions.
# Note that all output is written as event log entries - so you must install
# and start the service, then look at the event log for messages as events
# are generated.
# Events are generated for USB device insertion and removal, power state
# changes and hardware profile events - so try putting your computer to
# sleep and waking it, inserting a memory stick, etc then check the event log
# Most event notification support lives around win32gui
import servicemanager
import win32con
import win32event
import win32gui
import win32gui_struct
import win32service
import win32serviceutil
GUID_DEVINTERFACE_USB_DEVICE = "{A5DCBF10-6530-11D2-901F-00C04FB951ED}"
class EventDemoService(win32serviceutil.ServiceFramework):
_svc_name_ = "PyServiceEventDemo"
_svc_display_name_ = "Python Service Event Demo"
_svc_description_ = (
"Demonstrates a Python service which takes advantage of the extra notifications"
)
def __init__(self, args):
win32serviceutil.ServiceFramework.__init__(self, args)
self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)
# register for a device notification - we pass our service handle
# instead of a window handle.
filter = win32gui_struct.PackDEV_BROADCAST_DEVICEINTERFACE(
GUID_DEVINTERFACE_USB_DEVICE
)
self.hdn = win32gui.RegisterDeviceNotification(
self.ssh, filter, win32con.DEVICE_NOTIFY_SERVICE_HANDLE
)
# Override the base class so we can accept additional events.
def GetAcceptedControls(self):
# say we accept them all.
rc = win32serviceutil.ServiceFramework.GetAcceptedControls(self)
rc |= (
win32service.SERVICE_ACCEPT_PARAMCHANGE
| win32service.SERVICE_ACCEPT_NETBINDCHANGE
| win32service.SERVICE_CONTROL_DEVICEEVENT
| win32service.SERVICE_ACCEPT_HARDWAREPROFILECHANGE
| win32service.SERVICE_ACCEPT_POWEREVENT
| win32service.SERVICE_ACCEPT_SESSIONCHANGE
)
return rc
# All extra events are sent via SvcOtherEx (SvcOther remains as a
# function taking only the first args for backwards compat)
def SvcOtherEx(self, control, event_type, data):
# This is only showing a few of the extra events - see the MSDN
# docs for "HandlerEx callback" for more info.
if control == win32service.SERVICE_CONTROL_DEVICEEVENT:
info = win32gui_struct.UnpackDEV_BROADCAST(data)
msg = "A device event occurred: %x - %s" % (event_type, info)
elif control == win32service.SERVICE_CONTROL_HARDWAREPROFILECHANGE:
msg = "A hardware profile changed: type=%s, data=%s" % (event_type, data)
elif control == win32service.SERVICE_CONTROL_POWEREVENT:
msg = "A power event: setting %s" % data
elif control == win32service.SERVICE_CONTROL_SESSIONCHANGE:
# data is a single elt tuple, but this could potentially grow
# in the future if the win32 struct does
msg = "Session event: type=%s, data=%s" % (event_type, data)
else:
msg = "Other event: code=%d, type=%s, data=%s" % (control, event_type, data)
servicemanager.LogMsg(
servicemanager.EVENTLOG_INFORMATION_TYPE,
0xF000, # generic message
(msg, ""),
)
def SvcStop(self):
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
win32event.SetEvent(self.hWaitStop)
def SvcDoRun(self):
# do nothing at all - just wait to be stopped
win32event.WaitForSingleObject(self.hWaitStop, win32event.INFINITE)
# Write a stop message.
servicemanager.LogMsg(
servicemanager.EVENTLOG_INFORMATION_TYPE,
servicemanager.PYS_SERVICE_STOPPED,
(self._svc_name_, ""),
)
if __name__ == "__main__":
win32serviceutil.HandleCommandLine(EventDemoService)