From 094fe555b8f60acf4c379e659e9075da07ae8ac4 Mon Sep 17 00:00:00 2001 From: Labrys of Knossos Date: Sat, 5 Jan 2019 16:38:33 -0500 Subject: [PATCH] Clean up network utils --- core/utils/network.py | 74 ++++++++++++++++++++++++------------------- 1 file changed, 42 insertions(+), 32 deletions(-) diff --git a/core/utils/network.py b/core/utils/network.py index 4409c486..dae99837 100644 --- a/core/utils/network.py +++ b/core/utils/network.py @@ -6,51 +6,61 @@ import core from core import logger -# Wake function +def make_wake_on_lan_packet(mac_address): + """Build the Wake-On-LAN 'Magic Packet'.""" + address = ( + int(value, 16) + for value in mac_address.split(':') + ) + fmt = 'BBBBBB' + hardware_address = struct.pack(fmt, *address) + broadcast_address = b'\xFF' * 6 # FF:FF:FF:FF:FF:FF + return broadcast_address + hardware_address * 16 + + def wake_on_lan(ethernet_address): - addr_byte = ethernet_address.split(':') - hw_addr = struct.pack(b'BBBBBB', int(addr_byte[0], 16), - int(addr_byte[1], 16), - int(addr_byte[2], 16), - int(addr_byte[3], 16), - int(addr_byte[4], 16), - int(addr_byte[5], 16)) - - # Build the Wake-On-LAN 'Magic Packet'... - - msg = b'\xff' * 6 + hw_addr * 16 + """Send a WakeOnLan request.""" + # Create the WoL magic packet + magic_packet = make_wake_on_lan_packet(ethernet_address) # ...and send it to the broadcast address using UDP + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as connection: + connection.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + connection.sendto(magic_packet, ('', 9)) - ss = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - ss.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - ss.sendto(msg, ('', 9)) - ss.close() + logger.info('WakeOnLan sent for mac: {0}'.format(ethernet_address)) -# Test Connection function def test_connection(host, port): + """Test network connection.""" + address = host, port try: - socket.create_connection((host, port)) - return 'Up' - except Exception: + socket.create_connection(address) + except socket.error: return 'Down' + else: + return 'Up' def wake_up(): - host = core.CFG['WakeOnLan']['host'] - port = int(core.CFG['WakeOnLan']['port']) - mac = core.CFG['WakeOnLan']['mac'] + wol = core.CFG['WakeOnLan'] + host = wol['host'] + port = int(wol['port']) + mac = wol['mac'] + max_attempts = 4 - i = 1 - while test_connection(host, port) == 'Down' and i < 4: - logger.info(('Sending WakeOnLan Magic Packet for mac: {0}'.format(mac))) + logger.info('Trying to wake On lan.') + + for attempt in range(0, max_attempts): + logger.info('Attempt {0} of {1}'.format(attempt + 1, max_attempts, mac)) + if test_connection(host, port) == 'Up': + logger.info('System with mac: {0} has been woken.'.format(mac)) + break wake_on_lan(mac) time.sleep(20) - i = i + 1 - - if test_connection(host, port) == 'Down': # final check. - logger.warning('System with mac: {0} has not woken after 3 attempts. ' - 'Continuing with the rest of the script.'.format(mac)) else: - logger.info('System with mac: {0} has been woken. Continuing with the rest of the script.'.format(mac)) + if test_connection(host, port) == 'Down': # final check. + msg = 'System with mac: {0} has not woken after {1} attempts.' + logger.warning(msg.format(mac, max_attempts)) + + logger.info('Continuing with the rest of the script.')