diff --git a/src/base/search/searchpluginmanager.cpp b/src/base/search/searchpluginmanager.cpp index 095e3d57a..2e77b3403 100644 --- a/src/base/search/searchpluginmanager.cpp +++ b/src/base/search/searchpluginmanager.cpp @@ -367,14 +367,14 @@ QString SearchPluginManager::categoryFullName(const QString &categoryName) const QHash categoryTable { {u"all"_s, tr("All categories")}, - {u"movies"_s, tr("Movies")}, - {u"tv"_s, tr("TV shows")}, - {u"music"_s, tr("Music")}, - {u"games"_s, tr("Games")}, {u"anime"_s, tr("Anime")}, - {u"software"_s, tr("Software")}, + {u"books"_s, tr("Books")}, + {u"games"_s, tr("Games")}, + {u"movies"_s, tr("Movies")}, + {u"music"_s, tr("Music")}, {u"pictures"_s, tr("Pictures")}, - {u"books"_s, tr("Books")} + {u"software"_s, tr("Software")}, + {u"tv"_s, tr("TV shows")} }; return categoryTable.value(categoryName); } diff --git a/src/searchengine/nova3/helpers.py b/src/searchengine/nova3/helpers.py index f0206e383..60c4370e9 100644 --- a/src/searchengine/nova3/helpers.py +++ b/src/searchengine/nova3/helpers.py @@ -1,4 +1,4 @@ -#VERSION: 1.47 +#VERSION: 1.48 # Author: # Christophe DUMEZ (chris@qbittorrent.org) @@ -35,12 +35,12 @@ import os import re import socket import socks +import sys import tempfile import urllib.error -import urllib.parse import urllib.request from collections.abc import Mapping -from typing import Any, Dict, Optional +from typing import Any, Optional def getBrowserUserAgent() -> str: @@ -59,7 +59,7 @@ def getBrowserUserAgent() -> str: return f"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:{nowVersion}.0) Gecko/20100101 Firefox/{nowVersion}.0" -headers: Dict[str, Any] = {'User-Agent': getBrowserUserAgent()} +headers: dict[str, Any] = {'User-Agent': getBrowserUserAgent()} # SOCKS5 Proxy support if "sock_proxy" in os.environ and len(os.environ["sock_proxy"].strip()) > 0: @@ -91,51 +91,52 @@ def htmlentitydecode(s: str) -> str: def retrieve_url(url: str, custom_headers: Mapping[str, Any] = {}) -> str: """ Return the content of the url page as a string """ - req = urllib.request.Request(url, headers={**headers, **custom_headers}) + + request = urllib.request.Request(url, headers={**headers, **custom_headers}) try: - response = urllib.request.urlopen(req) + response = urllib.request.urlopen(request) except urllib.error.URLError as errno: - print(" ".join(("Connection error:", str(errno.reason)))) + print(f"Connection error: {errno.reason}", file=sys.stderr) return "" - dat: bytes = response.read() + data: bytes = response.read() + # Check if it is gzipped - if dat[:2] == b'\x1f\x8b': + if data[:2] == b'\x1f\x8b': # Data is gzip encoded, decode it - compressedstream = io.BytesIO(dat) - gzipper = gzip.GzipFile(fileobj=compressedstream) - extracted_data = gzipper.read() - dat = extracted_data - info = response.info() + with io.BytesIO(data) as compressedStream, gzip.GzipFile(fileobj=compressedStream) as gzipper: + data = gzipper.read() + charset = 'utf-8' try: - ignore, charset = info['Content-Type'].split('charset=') - except Exception: + charset = response.getheader('Content-Type', '').split('charset=', 1)[1] + except IndexError: pass - datStr = dat.decode(charset, 'replace') - datStr = htmlentitydecode(datStr) - return datStr + + dataStr = data.decode(charset, 'replace') + dataStr = htmlentitydecode(dataStr) + return dataStr def download_file(url: str, referer: Optional[str] = None) -> str: """ Download file at url and write it to a file, return the path to the file and the url """ - fileHandle, path = tempfile.mkstemp() - file = os.fdopen(fileHandle, "wb") + # Download url - req = urllib.request.Request(url, headers=headers) + request = urllib.request.Request(url, headers=headers) if referer is not None: - req.add_header('referer', referer) - response = urllib.request.urlopen(req) - dat = response.read() + request.add_header('referer', referer) + response = urllib.request.urlopen(request) + data = response.read() + # Check if it is gzipped - if dat[:2] == b'\x1f\x8b': + if data[:2] == b'\x1f\x8b': # Data is gzip encoded, decode it - compressedstream = io.BytesIO(dat) - gzipper = gzip.GzipFile(fileobj=compressedstream) - extracted_data = gzipper.read() - dat = extracted_data + with io.BytesIO(data) as compressedStream, gzip.GzipFile(fileobj=compressedStream) as gzipper: + data = gzipper.read() # Write it to a file - file.write(dat) - file.close() + fileHandle, path = tempfile.mkstemp() + with os.fdopen(fileHandle, "wb") as file: + file.write(data) + # return file path - return (path + " " + url) + return f"{path} {url}" diff --git a/src/searchengine/nova3/nova2.py b/src/searchengine/nova3/nova2.py index 9db438b96..bb6381f90 100644 --- a/src/searchengine/nova3/nova2.py +++ b/src/searchengine/nova3/nova2.py @@ -1,4 +1,4 @@ -#VERSION: 1.46 +#VERSION: 1.47 # Author: # Fabien Devaux @@ -36,13 +36,15 @@ import importlib import pathlib import sys +import traceback import urllib.parse -from collections.abc import Iterable, Iterator, Sequence +import xml.etree.ElementTree as ET +from collections.abc import Iterable from enum import Enum from glob import glob from multiprocessing import Pool, cpu_count from os import path -from typing import Dict, List, Optional, Set, Tuple, Type +from typing import Optional THREADED: bool = True try: @@ -50,7 +52,7 @@ try: except NotImplementedError: MAX_THREADS = 1 -Category = Enum('Category', ['all', 'movies', 'tv', 'music', 'games', 'anime', 'software', 'pictures', 'books']) +Category = Enum('Category', ['all', 'anime', 'books', 'games', 'movies', 'music', 'pictures', 'software', 'tv']) ################################################################################ @@ -62,13 +64,13 @@ Category = Enum('Category', ['all', 'movies', 'tv', 'music', 'games', 'anime', ' ################################################################################ -EngineName = str +EngineModuleName = str # the filename of the engine plugin class Engine: url: str - name: EngineName - supported_categories: Dict[str, str] + name: str + supported_categories: dict[str, str] def __init__(self) -> None: pass @@ -81,112 +83,89 @@ class Engine: # global state -engine_dict: Dict[EngineName, Optional[Type[Engine]]] = {} +engine_dict: dict[EngineModuleName, Optional[type[Engine]]] = {} -def list_engines() -> List[EngineName]: +def list_engines() -> list[EngineModuleName]: """ List all engines, - including broken engines that fail on import + including broken engines that would fail on import - Faster than initialize_engines - - Return list of all engines + Return list of all engines' module name """ - found_engines = [] + + names = [] for engine_path in glob(path.join(path.dirname(__file__), 'engines', '*.py')): - engine_name = path.basename(engine_path).split('.')[0].strip() - if len(engine_name) == 0 or engine_name.startswith('_'): + engine_module_name = path.basename(engine_path).split('.')[0].strip() + if len(engine_module_name) == 0 or engine_module_name.startswith('_'): continue - found_engines.append(engine_name) + names.append(engine_module_name) - return found_engines + return sorted(names) -def get_engine(engine_name: EngineName) -> Optional[Type[Engine]]: - if engine_name in engine_dict: - return engine_dict[engine_name] +def import_engine(engine_module_name: EngineModuleName) -> Optional[type[Engine]]: + if engine_module_name in engine_dict: + return engine_dict[engine_module_name] - # when import fails, engine is None - engine = None + # when import fails, return `None` + engine_class = None try: - # import engines.[engine] - engine_module = importlib.import_module("engines." + engine_name) - engine = getattr(engine_module, engine_name) + # import engines.[engine_module_name] + engine_module = importlib.import_module(f"engines.{engine_module_name}") + engine_class = getattr(engine_module, engine_module_name) except Exception: pass - engine_dict[engine_name] = engine - return engine + + engine_dict[engine_module_name] = engine_class + return engine_class -def initialize_engines(found_engines: Iterable[EngineName]) -> Set[EngineName]: - """ Import available engines - - Return set of available engines +def get_capabilities(engines: Iterable[EngineModuleName]) -> str: """ - supported_engines = set() - - for engine_name in found_engines: - # import engine - engine = get_engine(engine_name) - if engine is None: - continue - supported_engines.add(engine_name) - - return supported_engines - - -def engines_to_xml(supported_engines: Iterable[EngineName]) -> Iterator[str]: - """ Generates xml for supported engines """ - tab = " " * 4 - - for engine_name in supported_engines: - search_engine = get_engine(engine_name) - if search_engine is None: - continue - - supported_categories = "" - if hasattr(search_engine, "supported_categories"): - supported_categories = " ".join((key - for key in search_engine.supported_categories.keys() - if key != Category.all.name)) - - yield "".join((tab, "<", engine_name, ">\n", - tab, tab, "", search_engine.name, "\n", - tab, tab, "", search_engine.url, "\n", - tab, tab, "", supported_categories, "\n", - tab, "\n")) - - -def displayCapabilities(supported_engines: Iterable[EngineName]) -> None: - """ - Display capabilities in XML format + Return capabilities in XML format - + long name http://example.com movies music games - + """ - xml = "".join(("\n", - "".join(engines_to_xml(supported_engines)), - "")) - print(xml) + + capabilities_element = ET.Element('capabilities') + + for engine_module_name in engines: + engine_class = import_engine(engine_module_name) + if engine_class is None: + continue + + engine_module_element = ET.SubElement(capabilities_element, engine_module_name) + + ET.SubElement(engine_module_element, 'name').text = engine_class.name + ET.SubElement(engine_module_element, 'url').text = engine_class.url + + supported_categories = "" + if hasattr(engine_class, "supported_categories"): + supported_categories = " ".join((key + for key in sorted(engine_class.supported_categories.keys()) + if key != Category.all.name)) + ET.SubElement(engine_module_element, 'categories').text = supported_categories + + ET.indent(capabilities_element) + return ET.tostring(capabilities_element, 'unicode') -def run_search(engine_list: Tuple[Optional[Type[Engine]], str, Category]) -> bool: +def run_search(search_params: tuple[type[Engine], str, Category]) -> bool: """ Run search in engine - @param engine_list Tuple with engine, query and category + @param search_params Tuple with engine, query and category @retval False if any exceptions occurred @retval True otherwise """ - engine_class, what, cat = engine_list - if engine_class is None: - return False + engine_class, what, cat = search_params try: engine = engine_class() # avoid exceptions due to invalid category @@ -195,73 +174,65 @@ def run_search(engine_list: Tuple[Optional[Type[Engine]], str, Category]) -> boo engine.search(what, cat.name) else: engine.search(what) - return True except Exception: + traceback.print_exc() return False -def main(args: Sequence[str]) -> None: - # qbt tend to run this script in 'isolate mode' so append the current path manually - current_path = str(pathlib.Path(__file__).parent.resolve()) - if current_path not in sys.path: - sys.path.append(current_path) - - found_engines = list_engines() - - def show_usage() -> None: - print("./nova2.py all|engine1[,engine2]* ", file=sys.stderr) - print("found engines: " + ','.join(found_engines), file=sys.stderr) - print("to list available engines: ./nova2.py --capabilities [--names]", file=sys.stderr) - - if not args: - show_usage() - sys.exit(1) - elif args[0] == "--capabilities": - supported_engines = initialize_engines(found_engines) - if "--names" in args: - print(",".join(supported_engines)) - return - displayCapabilities(supported_engines) - return - elif len(args) < 3: - show_usage() - sys.exit(1) - - cat = args[1].lower() - try: - category = Category[cat] - except KeyError: - print(" - ".join(('Invalid category', cat)), file=sys.stderr) - sys.exit(1) - - # get only unique engines with set - engines_list = set(e.lower() for e in args[0].strip().split(',')) - - if not engines_list: - # engine list is empty. Nothing to do here - return - - if 'all' in engines_list: - # use all supported engines - # note: this can be slower than passing a list of supported engines - # because initialize_engines will also try to import not-supported engines - engines_list = initialize_engines(found_engines) - else: - # discard not-found engines - engines_list = {engine for engine in engines_list if engine in found_engines} - - what = urllib.parse.quote(' '.join(args[2:])) - params = ((get_engine(engine_name), what, category) for engine_name in engines_list) - - if THREADED: - # child process spawning is controlled min(number of searches, number of cpu) - with Pool(min(len(engines_list), MAX_THREADS)) as pool: - pool.map(run_search, params) - else: - # py3 note: map is needed to be evaluated for content to be executed - all(map(run_search, params)) - - if __name__ == "__main__": - main(sys.argv[1:]) + def main() -> int: + # qbt tend to run this script in 'isolate mode' so append the current path manually + current_path = str(pathlib.Path(__file__).parent.resolve()) + if current_path not in sys.path: + sys.path.append(current_path) + + # https://docs.python.org/3/library/sys.html#sys.exit + class ExitCode(Enum): + OK = 0 + AppError = 1 + ArgError = 2 + + found_engines = list_engines() + + prog_name = sys.argv[0] + prog_usage = (f"Usage: {prog_name} all|engine1[,engine2]* \n" + f"To list available engines: {prog_name} --capabilities [--names]\n" + f"Found engines: {','.join(found_engines)}") + + if "--capabilities" in sys.argv: + if "--names" in sys.argv: + print(",".join((e for e in found_engines if import_engine(e) is not None))) + return ExitCode.OK.value + + print(get_capabilities(found_engines)) + return ExitCode.OK.value + elif len(sys.argv) < 4: + print(prog_usage, file=sys.stderr) + return ExitCode.ArgError.value + + # get unique engines + engs = set(arg.strip().lower() for arg in sys.argv[1].split(',')) + engines = found_engines if 'all' in engs else [e for e in found_engines if e in engs] + + cat = sys.argv[2].lower() + try: + category = Category[cat] + except KeyError: + print(f"Invalid category: {cat}", file=sys.stderr) + return ExitCode.ArgError.value + + what = urllib.parse.quote(' '.join(sys.argv[3:])) + params = ((engine_class, what, category) for e in engines if (engine_class := import_engine(e)) is not None) + + search_success = False + if THREADED: + processes = max(min(len(engines), MAX_THREADS), 1) + with Pool(processes) as pool: + search_success = all(pool.map(run_search, params)) + else: + search_success = all(map(run_search, params)) + + return ExitCode.OK.value if search_success else ExitCode.AppError.value + + sys.exit(main()) diff --git a/src/searchengine/nova3/novaprinter.py b/src/searchengine/nova3/novaprinter.py index f4c9dcbb0..2c52db7c0 100644 --- a/src/searchengine/nova3/novaprinter.py +++ b/src/searchengine/nova3/novaprinter.py @@ -1,4 +1,4 @@ -#VERSION: 1.50 +#VERSION: 1.51 # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: @@ -25,21 +25,18 @@ # POSSIBILITY OF SUCH DAMAGE. import re -from collections.abc import Mapping -from typing import Any, Union +from typing import TypedDict, Union -# TODO: enable the following when using Python >= 3.8 -#SearchResults = TypedDict('SearchResults', { -# 'link': str, -# 'name': str, -# 'size': Union[float, int, str], -# 'seeds': int, -# 'leech': int, -# 'engine_url': str, -# 'desc_link': str, # Optional # TODO: use `NotRequired[str]` when using Python >= 3.11 -# 'pub_date': int # Optional # TODO: use `NotRequired[int]` when using Python >= 3.11 -#}) -SearchResults = Mapping[str, Any] +SearchResults = TypedDict('SearchResults', { + 'link': str, + 'name': str, + 'size': Union[float, int, str], # TODO: use `float | int | str` when using Python >= 3.10 + 'seeds': int, + 'leech': int, + 'engine_url': str, + 'desc_link': str, # Optional # TODO: use `NotRequired[str]` when using Python >= 3.11 + 'pub_date': int # Optional # TODO: use `NotRequired[int]` when using Python >= 3.11 +}) def prettyPrinter(dictionary: SearchResults) -> None: @@ -62,6 +59,7 @@ def prettyPrinter(dictionary: SearchResults) -> None: sizeUnitRegex: re.Pattern[str] = re.compile(r"^(?P\d*\.?\d+) *(?P[a-z]+)?", re.IGNORECASE) +# TODO: use `float | int | str` when using Python >= 3.10 def anySizeToBytes(size_string: Union[float, int, str]) -> int: """ Convert a string like '1 KB' to '1024' (bytes)