plexpy/lib/win32com/test/testADOEvents.py
dependabot[bot] faef9a94c4
Bump cherrypy from 18.8.0 to 18.9.0 (#2266)
* Bump cherrypy from 18.8.0 to 18.9.0

Bumps [cherrypy](https://github.com/cherrypy/cherrypy) from 18.8.0 to 18.9.0.
- [Changelog](https://github.com/cherrypy/cherrypy/blob/main/CHANGES.rst)
- [Commits](https://github.com/cherrypy/cherrypy/compare/v18.8.0...v18.9.0)

---
updated-dependencies:
- dependency-name: cherrypy
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Update cherrypy==18.9.0

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com>

[skip ci]
2024-03-24 15:25:44 -07:00

100 lines
2.7 KiB
Python

import os
import time
import pythoncom
from win32com.client import Dispatch, DispatchWithEvents, constants
finished = 0 # Flag for the wait loop from (3) to test
class ADOEvents: # event handler class
def OnWillConnect(self, str, user, pw, opt, sts, cn):
# Must have this event, as if it is not handled, ADO assumes the
# operation is cancelled, and raises an error (Operation cancelled
# by the user)
pass
def OnConnectComplete(self, error, status, connection):
# Assume no errors, until we have the basic stuff
# working. Now, "connection" should be an open
# connection to my data source
# Do the "something" from (2). For now, just
# print the connection data source
print("connection is", connection)
print("Connected to", connection.Properties("Data Source"))
# OK, our work is done. Let the main loop know
global finished
finished = 1
def OnCommitTransComplete(self, pError, adStatus, pConnection):
pass
def OnInfoMessage(self, pError, adStatus, pConnection):
pass
def OnDisconnect(self, adStatus, pConnection):
pass
def OnBeginTransComplete(self, TransactionLevel, pError, adStatus, pConnection):
pass
def OnRollbackTransComplete(self, pError, adStatus, pConnection):
pass
def OnExecuteComplete(
self, RecordsAffected, pError, adStatus, pCommand, pRecordset, pConnection
):
pass
def OnWillExecute(
self,
Source,
CursorType,
LockType,
Options,
adStatus,
pCommand,
pRecordset,
pConnection,
):
pass
def TestConnection(dbname):
# Create the ADO connection object, and link the event
# handlers into it
c = DispatchWithEvents("ADODB.Connection", ADOEvents)
# Initiate the asynchronous open
dsn = "Driver={Microsoft Access Driver (*.mdb)};Dbq=%s" % dbname
user = "system"
pw = "manager"
c.Open(dsn, user, pw, constants.adAsyncConnect)
# Sit in a loop, until our event handler (above) sets the
# "finished" flag or we time out.
end_time = time.clock() + 10
while time.clock() < end_time:
# Pump messages so that COM gets a look in
pythoncom.PumpWaitingMessages()
if not finished:
print("XXX - Failed to connect!")
def Test():
from . import testAccess
try:
testAccess.GenerateSupport()
except pythoncom.com_error:
print("*** Can not import the MSAccess type libraries - tests skipped")
return
dbname = testAccess.CreateTestAccessDatabase()
try:
TestConnection(dbname)
finally:
os.unlink(dbname)
if __name__ == "__main__":
Test()