mirror of
https://github.com/clinton-hall/nzbToMedia.git
synced 2025-08-14 18:47:09 -07:00
Refactor network utils to utils.network
This commit is contained in:
parent
84061fea2f
commit
42553df5cb
2 changed files with 57 additions and 52 deletions
|
@ -5,9 +5,7 @@ from __future__ import print_function, unicode_literals
|
|||
import os
|
||||
import re
|
||||
import shutil
|
||||
import socket
|
||||
import stat
|
||||
import struct
|
||||
import time
|
||||
|
||||
from babelfish import Language
|
||||
|
@ -21,6 +19,7 @@ import subliminal
|
|||
import core
|
||||
from core import extractor, logger
|
||||
from core.utils.download_info import get_download_info, update_download_info_status
|
||||
from core.utils.network import test_connection, wake_on_lan, wake_up
|
||||
from core.utils.parsers import (
|
||||
parse_args, parse_deluge, parse_other, parse_qbittorrent, parse_rtorrent, parse_transmission,
|
||||
parse_utorrent, parse_vuze,
|
||||
|
@ -319,56 +318,6 @@ def remove_read_only(filename):
|
|||
logger.warning('Cannot change permissions of {file}'.format(file=filename), logger.WARNING)
|
||||
|
||||
|
||||
# Wake function
|
||||
def wake_on_lan(ethernet_address):
|
||||
addr_byte = ethernet_address.split(':')
|
||||
hw_addr = struct.pack(b'BBBBBB', int(addr_byte[0], 16),
|
||||
int(addr_byte[1], 16),
|
||||
int(addr_byte[2], 16),
|
||||
int(addr_byte[3], 16),
|
||||
int(addr_byte[4], 16),
|
||||
int(addr_byte[5], 16))
|
||||
|
||||
# Build the Wake-On-LAN 'Magic Packet'...
|
||||
|
||||
msg = b'\xff' * 6 + hw_addr * 16
|
||||
|
||||
# ...and send it to the broadcast address using UDP
|
||||
|
||||
ss = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
ss.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||
ss.sendto(msg, ('<broadcast>', 9))
|
||||
ss.close()
|
||||
|
||||
|
||||
# Test Connection function
|
||||
def test_connection(host, port):
|
||||
try:
|
||||
socket.create_connection((host, port))
|
||||
return 'Up'
|
||||
except Exception:
|
||||
return 'Down'
|
||||
|
||||
|
||||
def wake_up():
|
||||
host = core.CFG['WakeOnLan']['host']
|
||||
port = int(core.CFG['WakeOnLan']['port'])
|
||||
mac = core.CFG['WakeOnLan']['mac']
|
||||
|
||||
i = 1
|
||||
while test_connection(host, port) == 'Down' and i < 4:
|
||||
logger.info(('Sending WakeOnLan Magic Packet for mac: {0}'.format(mac)))
|
||||
wake_on_lan(mac)
|
||||
time.sleep(20)
|
||||
i = i + 1
|
||||
|
||||
if test_connection(host, port) == 'Down': # final check.
|
||||
logger.warning('System with mac: {0} has not woken after 3 attempts. '
|
||||
'Continuing with the rest of the script.'.format(mac))
|
||||
else:
|
||||
logger.info('System with mac: {0} has been woken. Continuing with the rest of the script.'.format(mac))
|
||||
|
||||
|
||||
def char_replace(name):
|
||||
# Special character hex range:
|
||||
# CP850: 0x80-0xA5 (fortunately not used in ISO-8859-15)
|
||||
|
|
56
core/utils/network.py
Normal file
56
core/utils/network.py
Normal file
|
@ -0,0 +1,56 @@
|
|||
import socket
|
||||
import struct
|
||||
import time
|
||||
|
||||
import core
|
||||
from core import logger
|
||||
|
||||
|
||||
# Wake function
|
||||
def wake_on_lan(ethernet_address):
|
||||
addr_byte = ethernet_address.split(':')
|
||||
hw_addr = struct.pack(b'BBBBBB', int(addr_byte[0], 16),
|
||||
int(addr_byte[1], 16),
|
||||
int(addr_byte[2], 16),
|
||||
int(addr_byte[3], 16),
|
||||
int(addr_byte[4], 16),
|
||||
int(addr_byte[5], 16))
|
||||
|
||||
# Build the Wake-On-LAN 'Magic Packet'...
|
||||
|
||||
msg = b'\xff' * 6 + hw_addr * 16
|
||||
|
||||
# ...and send it to the broadcast address using UDP
|
||||
|
||||
ss = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
ss.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||
ss.sendto(msg, ('<broadcast>', 9))
|
||||
ss.close()
|
||||
|
||||
|
||||
# Test Connection function
|
||||
def test_connection(host, port):
|
||||
try:
|
||||
socket.create_connection((host, port))
|
||||
return 'Up'
|
||||
except Exception:
|
||||
return 'Down'
|
||||
|
||||
|
||||
def wake_up():
|
||||
host = core.CFG['WakeOnLan']['host']
|
||||
port = int(core.CFG['WakeOnLan']['port'])
|
||||
mac = core.CFG['WakeOnLan']['mac']
|
||||
|
||||
i = 1
|
||||
while test_connection(host, port) == 'Down' and i < 4:
|
||||
logger.info(('Sending WakeOnLan Magic Packet for mac: {0}'.format(mac)))
|
||||
wake_on_lan(mac)
|
||||
time.sleep(20)
|
||||
i = i + 1
|
||||
|
||||
if test_connection(host, port) == 'Down': # final check.
|
||||
logger.warning('System with mac: {0} has not woken after 3 attempts. '
|
||||
'Continuing with the rest of the script.'.format(mac))
|
||||
else:
|
||||
logger.info('System with mac: {0} has been woken. Continuing with the rest of the script.'.format(mac))
|
Loading…
Add table
Add a link
Reference in a new issue