Merge pull request #1942 from clinton-hall/get-the-hint

Fix mypy errors
This commit is contained in:
Labrys of Knossos 2022-12-14 04:52:31 -05:00 committed by GitHub
commit 7ce546175e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 79 additions and 86 deletions

View file

@ -9,6 +9,7 @@ import re
import subprocess
import sys
import time
import typing
import eol
@ -32,16 +33,16 @@ APP_ROOT = SOURCE_ROOT.parent
# init preliminaries
SYS_ARGV = sys.argv[1:]
APP_FILENAME = sys.argv[0]
APP_NAME = os.path.basename(APP_FILENAME)
LOG_DIR = os.path.join(APP_ROOT, 'logs')
LOG_FILE = os.path.join(LOG_DIR, 'nzbtomedia.log')
PID_FILE = os.path.join(LOG_DIR, 'nzbtomedia.pid')
CONFIG_FILE = os.path.join(APP_ROOT, 'autoProcessMedia.cfg')
CONFIG_SPEC_FILE = os.path.join(APP_ROOT, 'autoProcessMedia.cfg.spec')
CONFIG_MOVIE_FILE = os.path.join(APP_ROOT, 'autoProcessMovie.cfg')
CONFIG_TV_FILE = os.path.join(APP_ROOT, 'autoProcessTv.cfg')
TEST_FILE = os.path.join(APP_ROOT, 'tests', 'test.mp4')
APP_FILENAME = pathlib.Path(sys.argv[0])
APP_NAME: str = APP_FILENAME.name
LOG_DIR: pathlib.Path = APP_ROOT / 'logs'
LOG_FILE: pathlib.Path = LOG_DIR / 'nzbtomedia.log'
PID_FILE = LOG_DIR / 'nzbtomedia.pid'
CONFIG_FILE = APP_ROOT / 'autoProcessMedia.cfg'
CONFIG_SPEC_FILE = APP_ROOT / 'autoProcessMedia.cfg.spec'
CONFIG_MOVIE_FILE = APP_ROOT / 'autoProcessMovie.cfg'
CONFIG_TV_FILE = APP_ROOT / 'autoProcessTv.cfg'
TEST_FILE = APP_ROOT / 'tests' / 'test.mp4'
MYAPP = None
from core import logger, main_db, version_check, databases, transcoder
@ -91,6 +92,7 @@ TORRENT_CLIENTS = [
'manual',
]
# sickbeard fork/branch constants
FORK_DEFAULT = 'default'
FORK_FAILED = 'failed'
@ -105,7 +107,7 @@ FORK_SICKGEAR = 'SickGear'
FORK_SICKGEAR_API = 'SickGear-api'
FORK_STHENO = 'Stheno'
FORKS = {
FORKS: typing.Mapping[str, typing.Mapping] = {
FORK_DEFAULT: {'dir': None},
FORK_FAILED: {'dirName': None, 'failed': None},
FORK_FAILED_TORRENT: {'dir': None, 'failed': None, 'process_method': None},
@ -201,7 +203,10 @@ ALL_FORKS = {
for k in set(
list(
itertools.chain.from_iterable(
[FORKS[x].keys() for x in FORKS.keys()],
[
FORKS[x].keys()
for x in FORKS.keys()
],
),
),
)
@ -250,7 +255,7 @@ TORRENT_CLIENT_AGENT = None
TORRENT_CLASS = None
USE_LINK = None
OUTPUT_DIRECTORY = None
NOFLATTEN = []
NOFLATTEN: list[str] = []
DELETE_ORIGINAL = None
TORRENT_CHMOD_DIRECTORY = None
TORRENT_DEFAULT_DIRECTORY = None
@ -287,17 +292,17 @@ PLEX_SSL = None
PLEX_HOST = None
PLEX_PORT = None
PLEX_TOKEN = None
PLEX_SECTION = []
PLEX_SECTION: list[str] = []
EXT_CONTAINER = []
EXT_CONTAINER: list[str] = []
COMPRESSED_CONTAINER = []
MEDIA_CONTAINER = []
AUDIO_CONTAINER = []
META_CONTAINER = []
SECTIONS = []
CATEGORIES = []
FORK_SET = []
SECTIONS: list[str] = []
CATEGORIES: list[str] = []
FORK_SET: list[str] = []
MOUNTED = None
GETSUBS = False

View file

@ -38,13 +38,14 @@ def process(
dir_name: str,
input_name: str = '',
status: int = 0,
failed: bool = False,
client_agent: str = 'manual',
download_id: str = '',
input_category: str = '',
failure_link: str = '',
) -> ProcessResult:
# Get configuration
if core.CFG is None:
raise RuntimeError('Configuration not loaded.')
cfg = core.CFG[section][input_category]
# Base URL

View file

@ -38,13 +38,14 @@ def process(
dir_name: str,
input_name: str = '',
status: int = 0,
failed: bool = False,
client_agent: str = 'manual',
download_id: str = '',
input_category: str = '',
failure_link: str = '',
) -> ProcessResult:
# Get configuration
if core.CFG is None:
raise RuntimeError('Configuration not loaded.')
cfg = core.CFG[section][input_category]
# Base URL
@ -113,10 +114,7 @@ def process(
f'{r.status_code}',
)
result = r.text
if not type(result) == list:
result = result.split('\n')
for line in result:
for line in r.text.split('\n'):
if line:
logger.postprocess(line, section)
if 'Post Processing SUCCESSFUL' in line:

View file

@ -38,13 +38,14 @@ def process(
dir_name: str,
input_name: str = '',
status: int = 0,
failed: bool = False,
client_agent: str = 'manual',
download_id: str = '',
input_category: str = '',
failure_link: str = '',
) -> ProcessResult:
# Get configuration
if core.CFG is None:
raise RuntimeError('Configuration not loaded.')
cfg = core.CFG[section][input_category]
# Base URL

View file

@ -32,22 +32,20 @@ from core.utils.paths import remove_dir
from core.utils.network import server_responding
requests.packages.urllib3.disable_warnings()
def process(
*,
section: str,
dir_name: str,
input_name: str = '',
status: int = 0,
failed: bool = False,
client_agent: str = 'manual',
download_id: str = '',
input_category: str = '',
failure_link: str = '',
) -> ProcessResult:
# Get configuration
if core.CFG is None:
raise RuntimeError('Configuration not loaded.')
cfg = core.CFG[section][input_category]
# Base URL
@ -389,7 +387,7 @@ def process(
scan_id = None
elif section == 'Watcher3' and result['status'] == 'finished':
update_movie_status = result['tasks']['update_movie_status']
logger.postprocess('Watcher3 updated status to {}'.format())
logger.postprocess(f'Watcher3 updated status to {section}')
if update_movie_status == 'Finished':
return ProcessResult(
message=f'{section}: Successfully post-processed {input_name}',
@ -578,11 +576,11 @@ def process(
f'{section} will keep searching',
)
# Added a release that was not in the wanted list so confirm rename successful by finding this movie media.list.
# Added a release that was not in the wanted list so confirm rename
# successful by finding this movie media.list.
if not release:
download_id = (
None # we don't want to filter new releases based on this.
)
# we don't want to filter new releases based on this.
download_id = ''
if no_status_check:
return ProcessResult.success(

View file

@ -38,13 +38,14 @@ def process(
dir_name: str,
input_name: str = '',
status: int = 0,
failed: bool = False,
client_agent: str = 'manual',
download_id: str = '',
input_category: str = '',
failure_link: str = '',
) -> ProcessResult:
# Get configuration
if core.CFG is None:
raise RuntimeError('Configuration not loaded.')
cfg = core.CFG[section][input_category]
# Base URL
@ -158,12 +159,11 @@ def process(
else:
logger.debug(f'path: {dir_name}', section)
data = {'name': 'Rename', 'path': dir_name}
data = json.dumps(data)
try:
logger.debug(f'Opening URL: {url} with data: {data}', section)
r = requests.post(
url,
data=data,
data=json.dumps(data),
headers=headers,
stream=True,
verify=False,
@ -261,6 +261,8 @@ def process(
f'support failed downloads',
)
return ProcessResult.failure()
def get_status(url, apikey, dir_name):
logger.debug(

View file

@ -38,13 +38,14 @@ def process(
dir_name: str,
input_name: str = '',
status: int = 0,
failed: bool = False,
client_agent: str = 'manual',
download_id: str = '',
input_category: str = '',
failure_link: str = '',
) -> ProcessResult:
# Get configuration
if core.CFG is None:
raise RuntimeError('Configuration not loaded.')
cfg = core.CFG[section][input_category]
# Base URL
@ -79,7 +80,6 @@ def process(
force = int(cfg.get('force', 0))
delete_on = int(cfg.get('delete_on', 0))
ignore_subs = int(cfg.get('ignore_subs', 0))
status = int(failed)
# Begin processing
@ -185,11 +185,9 @@ def process(
if valid_files == num_files and not status == 0:
logger.info('Found Valid Videos. Setting status Success')
status = 0
failed = 0
if valid_files < num_files and status == 0:
logger.info('Found corrupt videos. Setting status Failed')
status = 1
failed = 1
if (
'NZBOP_VERSION' in os.environ
and os.environ['NZBOP_VERSION'][0:5] >= '14.0'
@ -220,23 +218,19 @@ def process(
logger.info(
'Check for media files ignored because nzbExtractionBy is set to Destination.',
)
if int(failed) == 0:
if status == 0:
logger.info('Setting Status Success.')
status = 0
failed = 0
else:
logger.info(
'Downloader reported an error during download or verification. Processing this as a failed download.',
)
status = 1
failed = 1
else:
logger.warning(
f'No media files found in directory {dir_name}. Processing this as a failed download',
section,
)
status = 1
failed = 1
if (
'NZBOP_VERSION' in os.environ
and os.environ['NZBOP_VERSION'][0:5] >= '14.0'
@ -275,7 +269,7 @@ def process(
# Part of the refactor
if init_sickbeard.fork_obj:
init_sickbeard.fork_obj.initialize(
dir_name, input_name, failed, client_agent='manual',
dir_name, input_name, status, client_agent='manual',
)
# configure SB params to pass
@ -289,9 +283,9 @@ def process(
for param in copy.copy(fork_params):
if param == 'failed':
if failed > 1:
failed = 1
fork_params[param] = failed
if status > 1:
status = 1
fork_params[param] = status
if 'proc_type' in fork_params:
del fork_params['proc_type']
if 'type' in fork_params:
@ -437,7 +431,6 @@ def process(
}
if not download_id:
data.pop('downloadClientId')
data = json.dumps(data)
url = core.utils.common.create_url(scheme, host, port, route)
try:
if section == 'SickBeard':
@ -517,7 +510,7 @@ def process(
logger.debug(f'Opening URL: {url} with data: {data}', section)
r = requests.post(
url,
data=data,
data=json.dumps(data),
headers=headers,
stream=True,
verify=False,

View file

@ -90,10 +90,10 @@ class Section(configobj.Section):
class ConfigObj(configobj.ConfigObj, Section):
def __init__(self, *args, **kw):
if len(args) == 0:
args = (core.CONFIG_FILE,)
super().__init__(*args, **kw)
def __init__(self, infile=None, *args, **kw):
if infile is None:
infile = core.CONFIG_FILE
super().__init__(os.fspath(infile), *args, **kw)
self.interpolation = False
@staticmethod
@ -115,7 +115,7 @@ class ConfigObj(configobj.ConfigObj, Section):
try:
# check for autoProcessMedia.cfg and create if it does not exist
if not os.path.isfile(core.CONFIG_FILE):
if not core.CONFIG_FILE.is_file():
shutil.copyfile(core.CONFIG_SPEC_FILE, core.CONFIG_FILE)
CFG_OLD = config(core.CONFIG_FILE)
except Exception as error:
@ -123,7 +123,7 @@ class ConfigObj(configobj.ConfigObj, Section):
try:
# check for autoProcessMedia.cfg.spec and create if it does not exist
if not os.path.isfile(core.CONFIG_SPEC_FILE):
if not core.CONFIG_SPEC_FILE.is_file():
shutil.copyfile(core.CONFIG_FILE, core.CONFIG_SPEC_FILE)
CFG_NEW = config(core.CONFIG_SPEC_FILE)
except Exception as error:

View file

@ -3,6 +3,7 @@ from __future__ import annotations
import errno
import json
import os
import pathlib
import platform
import re
import shutil
@ -19,9 +20,8 @@ from core.utils.paths import make_dir
__author__ = 'Justin'
def is_video_good(videofile, status, require_lan=None):
file_name_ext = os.path.basename(videofile)
file_name, file_ext = os.path.splitext(file_name_ext)
def is_video_good(video: pathlib.Path, status, require_lan=None):
file_ext = video.suffix
disable = False
if (
file_ext not in core.MEDIA_CONTAINER
@ -57,26 +57,26 @@ def is_video_good(videofile, status, require_lan=None):
'TRANSCODER',
)
if disable:
if (
status
): # if the download was 'failed', assume bad. If it was successful, assume good.
if status:
# if the download was 'failed', assume bad.
# If it was successful, assume good.
return False
else:
return True
logger.info(
f'Checking [{file_name_ext}] for corruption, please stand by ...',
f'Checking [{video.name}] for corruption, please stand by ...',
'TRANSCODER',
)
video_details, result = get_video_details(videofile)
video_details, result = get_video_details(video)
if result != 0:
logger.error(f'FAILED: [{file_name_ext}] is corrupted!', 'TRANSCODER')
logger.error(f'FAILED: [{video.name}] is corrupted!', 'TRANSCODER')
return False
if video_details.get('error'):
error_details = video_details.get('error')
logger.info(
f'FAILED: [{file_name_ext}] returned error [{error_details}].',
f'FAILED: [{video.name}] returned error [{error_details}].',
'TRANSCODER',
)
return False
@ -103,12 +103,12 @@ def is_video_good(videofile, status, require_lan=None):
valid_audio = audio_streams
if len(video_streams) > 0 and len(valid_audio) > 0:
logger.info(
f'SUCCESS: [{file_name_ext}] has no corruption.', 'TRANSCODER',
f'SUCCESS: [{video.name}] has no corruption.', 'TRANSCODER',
)
return True
else:
logger.info(
f'FAILED: [{file_name_ext}] has {len(video_streams)} video streams and {len(audio_streams)} audio streams. Assume corruption.',
f'FAILED: [{video.name}] has {len(video_streams)} video streams and {len(audio_streams)} audio streams. Assume corruption.',
'TRANSCODER',
)
return False

View file

@ -277,9 +277,7 @@ def backup_versioned_file(old_file, version):
try:
logger.log(
'Trying to back up {old} to {new]'.format(
old=old_file, new=new_file,
),
f'Trying to back up {old_file} to {new_file}',
logger.DEBUG,
)
shutil.copy(old_file, new_file)

View file

@ -4,6 +4,7 @@ import os
import socket
import subprocess
import sys
import typing
import core
from core import APP_FILENAME
@ -20,9 +21,9 @@ if os.name == 'nt':
class WindowsProcess:
def __init__(self):
self.mutex = None
self.mutexname = 'nzbtomedia_{pid}'.format(
pid=core.PID_FILE.replace('\\', '/'),
) # {D0E858DF-985E-4907-B7FB-8D732C3FC3B9}'
# {D0E858DF-985E-4907-B7FB-8D732C3FC3B9}
_path_str = os.fspath(core.PID_FILE).replace('\\', '/')
self.mutexname = f'nzbtomedia_{_path_str}'
self.CreateMutex = CreateMutex
self.CloseHandle = CloseHandle
self.GetLastError = GetLastError
@ -79,25 +80,21 @@ class PosixProcess:
if not self.lasterror:
# Write my pid into pidFile to keep multiple copies of program from running
try:
fp = open(self.pidpath, 'w')
fp.write(str(os.getpid()))
fp.close()
except Exception:
pass
with self.pidpath.open(mode='w') as fp:
fp.write(os.getpid())
return self.lasterror
def __del__(self):
if not self.lasterror:
if self.lock_socket:
self.lock_socket.close()
if os.path.isfile(self.pidpath):
os.unlink(self.pidpath)
if self.pidpath.is_file():
self.pidpath.unlink()
ProcessType = typing.Type[typing.Union[PosixProcess, WindowsProcess]]
if os.name == 'nt':
RunningProcess = WindowsProcess
RunningProcess: ProcessType = WindowsProcess
else:
RunningProcess = PosixProcess