mirror of
https://github.com/hay-kot/mealie.git
synced 2025-08-22 14:33:33 -07:00
refactor/split reciper scraper into seperate fiels
This commit is contained in:
parent
9e91f05df8
commit
cb22c58ff3
8 changed files with 290 additions and 260 deletions
|
@ -1,10 +1,11 @@
|
||||||
from db.db_setup import generate_session
|
from db.db_setup import generate_session
|
||||||
from fastapi import APIRouter, Depends, File, Form, HTTPException, Query
|
from fastapi import APIRouter, Depends, File, Form, HTTPException
|
||||||
|
from fastapi.logger import logger
|
||||||
from fastapi.responses import FileResponse
|
from fastapi.responses import FileResponse
|
||||||
from models.recipe_models import RecipeURLIn
|
from models.recipe_models import RecipeURLIn
|
||||||
from services.image_services import read_image, write_image
|
from services.image_services import read_image, write_image
|
||||||
from services.recipe_services import Recipe
|
from services.recipe_services import Recipe
|
||||||
from services.scrape_services import create_from_url
|
from services.scraper.scraper import create_from_url
|
||||||
from sqlalchemy.orm.session import Session
|
from sqlalchemy.orm.session import Session
|
||||||
from utils.snackbar import SnackResponse
|
from utils.snackbar import SnackResponse
|
||||||
|
|
||||||
|
@ -27,6 +28,7 @@ def parse_recipe_url(url: RecipeURLIn, db: Session = Depends(generate_session)):
|
||||||
""" Takes in a URL and attempts to scrape data and load it into the database """
|
""" Takes in a URL and attempts to scrape data and load it into the database """
|
||||||
|
|
||||||
recipe = create_from_url(url.url)
|
recipe = create_from_url(url.url)
|
||||||
|
|
||||||
recipe.save_to_db(db)
|
recipe.save_to_db(db)
|
||||||
|
|
||||||
return recipe.slug
|
return recipe.slug
|
||||||
|
|
|
@ -6,7 +6,7 @@ from pathlib import Path
|
||||||
|
|
||||||
from app_config import IMG_DIR, MIGRATION_DIR, TEMP_DIR
|
from app_config import IMG_DIR, MIGRATION_DIR, TEMP_DIR
|
||||||
from services.recipe_services import Recipe
|
from services.recipe_services import Recipe
|
||||||
from services.scrape_services import normalize_data, process_recipe_data
|
from services.scraper.cleaner import Cleaner
|
||||||
from app_config import IMG_DIR, TEMP_DIR
|
from app_config import IMG_DIR, TEMP_DIR
|
||||||
|
|
||||||
|
|
||||||
|
@ -34,8 +34,7 @@ def import_recipes(recipe_dir: Path) -> Recipe:
|
||||||
with open(recipe_file, "r") as f:
|
with open(recipe_file, "r") as f:
|
||||||
recipe_dict = json.loads(f.read())
|
recipe_dict = json.loads(f.read())
|
||||||
|
|
||||||
recipe_dict = process_recipe_data(recipe_dict)
|
recipe_data = Cleaner.clean(recipe_dict)
|
||||||
recipe_data = normalize_data(recipe_dict)
|
|
||||||
|
|
||||||
image_name = None
|
image_name = None
|
||||||
if image:
|
if image:
|
||||||
|
|
|
@ -38,8 +38,8 @@ class Recipe(BaseModel):
|
||||||
tags: Optional[List[str]] = []
|
tags: Optional[List[str]] = []
|
||||||
dateAdded: Optional[datetime.date]
|
dateAdded: Optional[datetime.date]
|
||||||
notes: Optional[List[RecipeNote]] = []
|
notes: Optional[List[RecipeNote]] = []
|
||||||
rating: Optional[int]
|
rating: Optional[int] = 0
|
||||||
orgURL: Optional[str]
|
orgURL: Optional[str] = ""
|
||||||
extras: Optional[dict] = {}
|
extras: Optional[dict] = {}
|
||||||
|
|
||||||
class Config:
|
class Config:
|
||||||
|
|
|
@ -1,246 +0,0 @@
|
||||||
import html
|
|
||||||
import json
|
|
||||||
import re
|
|
||||||
from typing import List, Tuple
|
|
||||||
|
|
||||||
import extruct
|
|
||||||
import requests
|
|
||||||
import scrape_schema_recipe
|
|
||||||
from app_config import DEBUG_DIR
|
|
||||||
from slugify import slugify
|
|
||||||
from utils.logger import logger
|
|
||||||
from w3lib.html import get_base_url
|
|
||||||
|
|
||||||
from services.image_services import scrape_image
|
|
||||||
from services.recipe_services import Recipe
|
|
||||||
|
|
||||||
LAST_JSON = DEBUG_DIR.joinpath("last_recipe.json")
|
|
||||||
|
|
||||||
|
|
||||||
def cleanhtml(raw_html):
|
|
||||||
cleanr = re.compile("<.*?>")
|
|
||||||
cleantext = re.sub(cleanr, "", raw_html)
|
|
||||||
return cleantext
|
|
||||||
|
|
||||||
|
|
||||||
def normalize_image_url(image) -> str:
|
|
||||||
if type(image) == list:
|
|
||||||
return image[0]
|
|
||||||
elif type(image) == dict:
|
|
||||||
return image["url"]
|
|
||||||
elif type(image) == str:
|
|
||||||
return image
|
|
||||||
else:
|
|
||||||
raise Exception(f"Unrecognised image URL format: {image}")
|
|
||||||
|
|
||||||
|
|
||||||
def normalize_instructions(instructions) -> List[dict]:
|
|
||||||
if not instructions:
|
|
||||||
return []
|
|
||||||
|
|
||||||
# One long string split by (possibly multiple) new lines
|
|
||||||
if type(instructions) == str:
|
|
||||||
return [
|
|
||||||
{"text": normalize_instruction(line)}
|
|
||||||
for line in instructions.splitlines()
|
|
||||||
if line
|
|
||||||
]
|
|
||||||
|
|
||||||
# Plain strings in a list
|
|
||||||
elif type(instructions) == list and type(instructions[0]) == str:
|
|
||||||
return [{"text": normalize_instruction(step)} for step in instructions]
|
|
||||||
|
|
||||||
# Dictionaries (let's assume it's a HowToStep) in a list
|
|
||||||
elif type(instructions) == list and type(instructions[0]) == dict:
|
|
||||||
try:
|
|
||||||
# If HowToStep is under HowToSection
|
|
||||||
sectionSteps = []
|
|
||||||
for step in instructions:
|
|
||||||
if step["@type"] == "HowToSection":
|
|
||||||
for item in step["itemListElement"]:
|
|
||||||
sectionSteps.append(item)
|
|
||||||
|
|
||||||
if len(sectionSteps) > 0:
|
|
||||||
return [
|
|
||||||
{"text": normalize_instruction(step["text"])}
|
|
||||||
for step in sectionSteps
|
|
||||||
if step["@type"] == "HowToStep"
|
|
||||||
]
|
|
||||||
|
|
||||||
return [
|
|
||||||
{"text": normalize_instruction(step["text"])}
|
|
||||||
for step in instructions
|
|
||||||
if step["@type"] == "HowToStep"
|
|
||||||
]
|
|
||||||
except Exception as e:
|
|
||||||
# Not "@type", try "type"
|
|
||||||
return [
|
|
||||||
{"text": normalize_instruction(step["properties"]["text"])}
|
|
||||||
for step in instructions
|
|
||||||
if step["type"].find("HowToStep") > -1
|
|
||||||
]
|
|
||||||
|
|
||||||
else:
|
|
||||||
raise Exception(f"Unrecognised instruction format: {instructions}")
|
|
||||||
|
|
||||||
|
|
||||||
def normalize_instruction(line) -> str:
|
|
||||||
l = cleanhtml(line.strip())
|
|
||||||
# Some sites erroneously escape their strings on multiple levels
|
|
||||||
while not l == (l := html.unescape(l)):
|
|
||||||
pass
|
|
||||||
return l
|
|
||||||
|
|
||||||
|
|
||||||
def normalize_ingredient(ingredients: list) -> str:
|
|
||||||
|
|
||||||
return [cleanhtml(html.unescape(ing)) for ing in ingredients]
|
|
||||||
|
|
||||||
|
|
||||||
def normalize_yield(yld) -> str:
|
|
||||||
if type(yld) == list:
|
|
||||||
return yld[-1]
|
|
||||||
else:
|
|
||||||
return yld
|
|
||||||
|
|
||||||
|
|
||||||
def normalize_time(time_entry) -> str:
|
|
||||||
if type(time_entry) == type(None):
|
|
||||||
return None
|
|
||||||
elif type(time_entry) != str:
|
|
||||||
return str(time_entry)
|
|
||||||
|
|
||||||
|
|
||||||
def normalize_data(recipe_data: dict) -> dict:
|
|
||||||
recipe_data["totalTime"] = normalize_time(recipe_data.get("totalTime"))
|
|
||||||
recipe_data["description"] = cleanhtml(recipe_data.get("description", ""))
|
|
||||||
recipe_data["prepTime"] = normalize_time(recipe_data.get("prepTime"))
|
|
||||||
recipe_data["performTime"] = normalize_time(recipe_data.get("performTime"))
|
|
||||||
recipe_data["recipeYield"] = normalize_yield(recipe_data.get("recipeYield"))
|
|
||||||
recipe_data["recipeIngredient"] = normalize_ingredient(
|
|
||||||
recipe_data.get("recipeIngredient")
|
|
||||||
)
|
|
||||||
recipe_data["recipeInstructions"] = normalize_instructions(
|
|
||||||
recipe_data["recipeInstructions"]
|
|
||||||
)
|
|
||||||
recipe_data["image"] = normalize_image_url(recipe_data["image"])
|
|
||||||
return recipe_data
|
|
||||||
|
|
||||||
|
|
||||||
def process_recipe_data(new_recipe: dict, url=None) -> dict:
|
|
||||||
slug = slugify(new_recipe["name"])
|
|
||||||
mealie_tags = {
|
|
||||||
"slug": slug,
|
|
||||||
"orgURL": url,
|
|
||||||
"categories": [],
|
|
||||||
"tags": [],
|
|
||||||
"dateAdded": None,
|
|
||||||
"notes": [],
|
|
||||||
"extras": [],
|
|
||||||
}
|
|
||||||
|
|
||||||
new_recipe.update(mealie_tags)
|
|
||||||
|
|
||||||
return new_recipe
|
|
||||||
|
|
||||||
|
|
||||||
def extract_recipe_from_html(html: str, url: str) -> dict:
|
|
||||||
try:
|
|
||||||
scraped_recipes: List[dict] = scrape_schema_recipe.loads(
|
|
||||||
html, python_objects=True
|
|
||||||
)
|
|
||||||
dump_last_json(scraped_recipes)
|
|
||||||
|
|
||||||
if not scraped_recipes:
|
|
||||||
scraped_recipes: List[dict] = scrape_schema_recipe.scrape_url(
|
|
||||||
url, python_objects=True
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
# trying without python_objects
|
|
||||||
scraped_recipes: List[dict] = scrape_schema_recipe.loads(html)
|
|
||||||
dump_last_json(scraped_recipes)
|
|
||||||
|
|
||||||
if not scraped_recipes:
|
|
||||||
scraped_recipes: List[dict] = scrape_schema_recipe.scrape_url(url)
|
|
||||||
|
|
||||||
if scraped_recipes:
|
|
||||||
new_recipe: dict = scraped_recipes[0]
|
|
||||||
logger.info(f"Recipe Scraped From Web: {new_recipe}")
|
|
||||||
|
|
||||||
if not new_recipe:
|
|
||||||
return "fail" # TODO: Return Better Error Here
|
|
||||||
|
|
||||||
new_recipe = process_recipe_data(new_recipe, url=url)
|
|
||||||
new_recipe = normalize_data(new_recipe)
|
|
||||||
else:
|
|
||||||
new_recipe = basic_recipe_from_opengraph(html, url)
|
|
||||||
logger.info(f"Recipe Scraped from opengraph metadata: {new_recipe}")
|
|
||||||
|
|
||||||
return new_recipe
|
|
||||||
|
|
||||||
|
|
||||||
def download_image_for_recipe(recipe: dict) -> dict:
|
|
||||||
try:
|
|
||||||
img_path = scrape_image(recipe.get("image"), recipe.get("slug"))
|
|
||||||
recipe["image"] = img_path.name
|
|
||||||
except:
|
|
||||||
recipe["image"] = "no image"
|
|
||||||
|
|
||||||
return recipe
|
|
||||||
|
|
||||||
|
|
||||||
def og_field(properties: dict, field_name: str) -> str:
|
|
||||||
return next((val for name, val in properties if name == field_name), None)
|
|
||||||
|
|
||||||
|
|
||||||
def og_fields(properties: List[Tuple[str, str]], field_name: str) -> List[str]:
|
|
||||||
return list({val for name, val in properties if name == field_name})
|
|
||||||
|
|
||||||
|
|
||||||
def basic_recipe_from_opengraph(html: str, url: str) -> dict:
|
|
||||||
base_url = get_base_url(html, url)
|
|
||||||
data = extruct.extract(html, base_url=base_url)
|
|
||||||
try:
|
|
||||||
properties = data["opengraph"][0]["properties"]
|
|
||||||
except:
|
|
||||||
return
|
|
||||||
|
|
||||||
return {
|
|
||||||
"name": og_field(properties, "og:title"),
|
|
||||||
"description": og_field(properties, "og:description"),
|
|
||||||
"image": og_field(properties, "og:image"),
|
|
||||||
"recipeYield": "",
|
|
||||||
# FIXME: If recipeIngredient is an empty list, mongodb's data verification fails.
|
|
||||||
"recipeIngredient": ["Could not detect ingredients"],
|
|
||||||
# FIXME: recipeInstructions is allowed to be empty but message this is added for user sanity.
|
|
||||||
"recipeInstructions": [{"text": "Could not detect instructions"}],
|
|
||||||
"slug": slugify(og_field(properties, "og:title")),
|
|
||||||
"orgURL": og_field(properties, "og:url"),
|
|
||||||
"categories": [],
|
|
||||||
"tags": og_fields(properties, "og:article:tag"),
|
|
||||||
"dateAdded": None,
|
|
||||||
"notes": [],
|
|
||||||
"extras": [],
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def dump_last_json(recipe_data: dict):
|
|
||||||
with open(LAST_JSON, "w") as f:
|
|
||||||
f.write(json.dumps(recipe_data, indent=4, default=str))
|
|
||||||
|
|
||||||
return
|
|
||||||
|
|
||||||
|
|
||||||
def process_recipe_url(url: str) -> dict:
|
|
||||||
r = requests.get(url)
|
|
||||||
new_recipe = extract_recipe_from_html(r.text, url)
|
|
||||||
new_recipe = download_image_for_recipe(new_recipe)
|
|
||||||
return new_recipe
|
|
||||||
|
|
||||||
|
|
||||||
def create_from_url(url: str) -> Recipe:
|
|
||||||
recipe_data = process_recipe_url(url)
|
|
||||||
|
|
||||||
recipe = Recipe(**recipe_data)
|
|
||||||
|
|
||||||
return recipe
|
|
151
mealie/services/scraper/cleaner.py
Normal file
151
mealie/services/scraper/cleaner.py
Normal file
|
@ -0,0 +1,151 @@
|
||||||
|
import html
|
||||||
|
import re
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
from slugify import slugify
|
||||||
|
|
||||||
|
|
||||||
|
class Cleaner:
|
||||||
|
"""A Namespace for utility function to clean recipe data extracted
|
||||||
|
from a url and returns a dictionary that is ready for import into
|
||||||
|
the database. Cleaner.clean is the main entrypoint
|
||||||
|
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def clean(recipe_data: dict, url=None) -> dict:
|
||||||
|
print(recipe_data)
|
||||||
|
"""Main entrypoint to clean a recipe extracted from the web
|
||||||
|
and format the data into an accectable format for the database
|
||||||
|
|
||||||
|
Args:
|
||||||
|
recipe_data (dict): raw recipe dicitonary
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: cleaned recipe dictionary
|
||||||
|
"""
|
||||||
|
recipe_data["totalTime"] = Cleaner.time(recipe_data.get("totalTime"))
|
||||||
|
recipe_data["description"] = Cleaner.html(recipe_data.get("description", ""))
|
||||||
|
recipe_data["prepTime"] = Cleaner.time(recipe_data.get("prepTime"))
|
||||||
|
recipe_data["performTime"] = Cleaner.time(recipe_data.get("performTime"))
|
||||||
|
recipe_data["recipeYield"] = Cleaner.yield_amount(
|
||||||
|
recipe_data.get("recipeYield")
|
||||||
|
)
|
||||||
|
recipe_data["recipeIngredient"] = Cleaner.ingredient(
|
||||||
|
recipe_data.get("recipeIngredient")
|
||||||
|
)
|
||||||
|
recipe_data["recipeInstructions"] = Cleaner.instructions(
|
||||||
|
recipe_data["recipeInstructions"]
|
||||||
|
)
|
||||||
|
recipe_data["image"] = Cleaner.image(recipe_data["image"])
|
||||||
|
recipe_data["slug"] = slugify(recipe_data["name"])
|
||||||
|
recipe_data["orgURL"] = url
|
||||||
|
|
||||||
|
return recipe_data
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def html(raw_html):
|
||||||
|
cleanr = re.compile("<.*?>")
|
||||||
|
cleantext = re.sub(cleanr, "", raw_html)
|
||||||
|
return cleantext
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def image(image) -> str:
|
||||||
|
if type(image) == list:
|
||||||
|
return image[0]
|
||||||
|
elif type(image) == dict:
|
||||||
|
return image["url"]
|
||||||
|
elif type(image) == str:
|
||||||
|
return image
|
||||||
|
else:
|
||||||
|
raise Exception(f"Unrecognised image URL format: {image}")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def instructions(instructions) -> List[dict]:
|
||||||
|
if not instructions:
|
||||||
|
return []
|
||||||
|
|
||||||
|
# One long string split by (possibly multiple) new lines
|
||||||
|
print(instructions)
|
||||||
|
if type(instructions) == str:
|
||||||
|
return [
|
||||||
|
{"text": Cleaner._instruction(line)}
|
||||||
|
for line in instructions.splitlines()
|
||||||
|
if line
|
||||||
|
]
|
||||||
|
|
||||||
|
# Plain strings in a list
|
||||||
|
elif type(instructions) == list and type(instructions[0]) == str:
|
||||||
|
return [{"text": Cleaner._instruction(step)} for step in instructions]
|
||||||
|
|
||||||
|
# Dictionaries (let's assume it's a HowToStep) in a list
|
||||||
|
elif type(instructions) == list and type(instructions[0]) == dict:
|
||||||
|
# Try List of Dictionary without "@type" or "type"
|
||||||
|
if not instructions[0].get("@type", False) and not instructions[0].get(
|
||||||
|
"type", False
|
||||||
|
):
|
||||||
|
return [
|
||||||
|
{"text": Cleaner._instruction(step["text"])}
|
||||||
|
for step in instructions
|
||||||
|
]
|
||||||
|
|
||||||
|
try:
|
||||||
|
# If HowToStep is under HowToSection
|
||||||
|
sectionSteps = []
|
||||||
|
for step in instructions:
|
||||||
|
if step["@type"] == "HowToSection":
|
||||||
|
[sectionSteps.append(item) for item in step["itemListELement"]]
|
||||||
|
|
||||||
|
if len(sectionSteps) > 0:
|
||||||
|
return [
|
||||||
|
{"text": Cleaner._instruction(step["text"])}
|
||||||
|
for step in sectionSteps
|
||||||
|
if step["@type"] == "HowToStep"
|
||||||
|
]
|
||||||
|
|
||||||
|
return [
|
||||||
|
{"text": Cleaner._instruction(step["text"])}
|
||||||
|
for step in instructions
|
||||||
|
if step["@type"] == "HowToStep"
|
||||||
|
]
|
||||||
|
except Exception as e:
|
||||||
|
# Not "@type", try "type"
|
||||||
|
try:
|
||||||
|
return [
|
||||||
|
{"text": Cleaner._instruction(step["properties"]["text"])}
|
||||||
|
for step in instructions
|
||||||
|
if step["type"].find("HowToStep") > -1
|
||||||
|
]
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise Exception(f"Unrecognised instruction format: {instructions}")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _instruction(line) -> str:
|
||||||
|
l = Cleaner.html(line.strip())
|
||||||
|
# Some sites erroneously escape their strings on multiple levels
|
||||||
|
while not l == (l := html.unescape(l)):
|
||||||
|
pass
|
||||||
|
return l
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def ingredient(ingredients: list) -> str:
|
||||||
|
|
||||||
|
return [Cleaner.html(html.unescape(ing)) for ing in ingredients]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def yield_amount(yld) -> str:
|
||||||
|
if type(yld) == list:
|
||||||
|
return yld[-1]
|
||||||
|
else:
|
||||||
|
return yld
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def time(time_entry) -> str:
|
||||||
|
if type(time_entry) == type(None):
|
||||||
|
return None
|
||||||
|
elif type(time_entry) != str:
|
||||||
|
return str(time_entry)
|
43
mealie/services/scraper/open_graph.py
Normal file
43
mealie/services/scraper/open_graph.py
Normal file
|
@ -0,0 +1,43 @@
|
||||||
|
from typing import Tuple
|
||||||
|
|
||||||
|
import extruct
|
||||||
|
from app_config import DEBUG_DIR
|
||||||
|
from slugify import slugify
|
||||||
|
from w3lib.html import get_base_url
|
||||||
|
|
||||||
|
LAST_JSON = DEBUG_DIR.joinpath("last_recipe.json")
|
||||||
|
|
||||||
|
|
||||||
|
def og_field(properties: dict, field_name: str) -> str:
|
||||||
|
return next((val for name, val in properties if name == field_name), None)
|
||||||
|
|
||||||
|
|
||||||
|
def og_fields(properties: list[Tuple[str, str]], field_name: str) -> list[str]:
|
||||||
|
return list({val for name, val in properties if name == field_name})
|
||||||
|
|
||||||
|
|
||||||
|
def basic_recipe_from_opengraph(html: str, url: str) -> dict:
|
||||||
|
base_url = get_base_url(html, url)
|
||||||
|
data = extruct.extract(html, base_url=base_url)
|
||||||
|
try:
|
||||||
|
properties = data["opengraph"][0]["properties"]
|
||||||
|
except:
|
||||||
|
return
|
||||||
|
|
||||||
|
return {
|
||||||
|
"name": og_field(properties, "og:title"),
|
||||||
|
"description": og_field(properties, "og:description"),
|
||||||
|
"image": og_field(properties, "og:image"),
|
||||||
|
"recipeYield": "",
|
||||||
|
# FIXME: If recipeIngredient is an empty list, mongodb's data verification fails.
|
||||||
|
"recipeIngredient": ["Could not detect ingredients"],
|
||||||
|
# FIXME: recipeInstructions is allowed to be empty but message this is added for user sanity.
|
||||||
|
"recipeInstructions": [{"text": "Could not detect instructions"}],
|
||||||
|
"slug": slugify(og_field(properties, "og:title")),
|
||||||
|
"orgURL": og_field(properties, "og:url"),
|
||||||
|
"categories": [],
|
||||||
|
"tags": og_fields(properties, "og:article:tag"),
|
||||||
|
"dateAdded": None,
|
||||||
|
"notes": [],
|
||||||
|
"extras": [],
|
||||||
|
}
|
84
mealie/services/scraper/scraper.py
Normal file
84
mealie/services/scraper/scraper.py
Normal file
|
@ -0,0 +1,84 @@
|
||||||
|
import json
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
import requests
|
||||||
|
import scrape_schema_recipe
|
||||||
|
from app_config import DEBUG_DIR
|
||||||
|
from services.image_services import scrape_image
|
||||||
|
from services.recipe_services import Recipe
|
||||||
|
from services.scraper import open_graph
|
||||||
|
from services.scraper.cleaner import Cleaner
|
||||||
|
from utils.logger import logger
|
||||||
|
|
||||||
|
LAST_JSON = DEBUG_DIR.joinpath("last_recipe.json")
|
||||||
|
|
||||||
|
|
||||||
|
def create_from_url(url: str) -> Recipe:
|
||||||
|
"""Main entry point for generating a recipe from a URL. Pass in a URL and
|
||||||
|
a Recipe object will be returned if successful.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
url (str): a valid string representing a URL
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Recipe: Recipe Object
|
||||||
|
"""
|
||||||
|
r = requests.get(url)
|
||||||
|
new_recipe = extract_recipe_from_html(r.text, url)
|
||||||
|
new_recipe = Cleaner.clean(new_recipe)
|
||||||
|
new_recipe = download_image_for_recipe(new_recipe)
|
||||||
|
|
||||||
|
recipe = Recipe(**new_recipe)
|
||||||
|
|
||||||
|
return recipe
|
||||||
|
|
||||||
|
|
||||||
|
def extract_recipe_from_html(html: str, url: str) -> dict:
|
||||||
|
try:
|
||||||
|
scraped_recipes: List[dict] = scrape_schema_recipe.loads(
|
||||||
|
html, python_objects=True
|
||||||
|
)
|
||||||
|
dump_last_json(scraped_recipes)
|
||||||
|
|
||||||
|
if not scraped_recipes:
|
||||||
|
scraped_recipes: List[dict] = scrape_schema_recipe.scrape_url(
|
||||||
|
url, python_objects=True
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
# trying without python_objects
|
||||||
|
scraped_recipes: List[dict] = scrape_schema_recipe.loads(html)
|
||||||
|
dump_last_json(scraped_recipes)
|
||||||
|
|
||||||
|
if not scraped_recipes:
|
||||||
|
scraped_recipes: List[dict] = scrape_schema_recipe.scrape_url(url)
|
||||||
|
|
||||||
|
if scraped_recipes:
|
||||||
|
new_recipe: dict = scraped_recipes[0]
|
||||||
|
logger.info(f"Recipe Scraped From Web: {new_recipe}")
|
||||||
|
|
||||||
|
if not new_recipe:
|
||||||
|
return "fail" # TODO: Return Better Error Here
|
||||||
|
|
||||||
|
new_recipe = Cleaner.clean(new_recipe, url)
|
||||||
|
else:
|
||||||
|
new_recipe = open_graph.basic_recipe_from_opengraph(html, url)
|
||||||
|
logger.info(f"Recipe Scraped from opengraph metadata: {new_recipe}")
|
||||||
|
|
||||||
|
return new_recipe
|
||||||
|
|
||||||
|
|
||||||
|
def download_image_for_recipe(recipe: dict) -> dict:
|
||||||
|
try:
|
||||||
|
img_path = scrape_image(recipe.get("image"), recipe.get("slug"))
|
||||||
|
recipe["image"] = img_path.name
|
||||||
|
except:
|
||||||
|
recipe["image"] = "no image"
|
||||||
|
|
||||||
|
return recipe
|
||||||
|
|
||||||
|
|
||||||
|
def dump_last_json(recipe_data: dict):
|
||||||
|
with open(LAST_JSON, "w") as f:
|
||||||
|
f.write(json.dumps(recipe_data, indent=4, default=str))
|
||||||
|
|
||||||
|
return
|
|
@ -2,11 +2,8 @@ import json
|
||||||
import re
|
import re
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from services.scrape_services import (
|
from services.scraper.cleaner import Cleaner
|
||||||
extract_recipe_from_html,
|
from services.scraper.scraper import extract_recipe_from_html
|
||||||
normalize_data,
|
|
||||||
normalize_instructions,
|
|
||||||
)
|
|
||||||
from tests.test_config import TEST_RAW_HTML, TEST_RAW_RECIPES
|
from tests.test_config import TEST_RAW_HTML, TEST_RAW_RECIPES
|
||||||
|
|
||||||
# https://github.com/django/django/blob/stable/1.3.x/django/core/validators.py#L45
|
# https://github.com/django/django/blob/stable/1.3.x/django/core/validators.py#L45
|
||||||
|
@ -42,7 +39,7 @@ url_validation_regex = re.compile(
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
def test_normalize_data(json_file, num_steps):
|
def test_normalize_data(json_file, num_steps):
|
||||||
recipe_data = normalize_data(json.load(open(TEST_RAW_RECIPES.joinpath(json_file))))
|
recipe_data = Cleaner.clean(json.load(open(TEST_RAW_RECIPES.joinpath(json_file))))
|
||||||
assert len(recipe_data["recipeInstructions"]) == num_steps
|
assert len(recipe_data["recipeInstructions"]) == num_steps
|
||||||
|
|
||||||
|
|
||||||
|
@ -58,7 +55,7 @@ def test_normalize_data(json_file, num_steps):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
def test_normalize_instructions(instructions):
|
def test_normalize_instructions(instructions):
|
||||||
assert normalize_instructions(instructions) == [
|
assert Cleaner.instructions(instructions) == [
|
||||||
{"text": "A"},
|
{"text": "A"},
|
||||||
{"text": "B"},
|
{"text": "B"},
|
||||||
{"text": "C"},
|
{"text": "C"},
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue