Add updated mages and parse amounts

This commit is contained in:
Dallin Miner 2025-02-19 05:23:36 +00:00 committed by Dallin Miner
commit 31a7d0af85

View file

@ -1,11 +1,13 @@
import os
import re
import tempfile
import zipfile
from fractions import Fraction
from pathlib import Path
from typing import Any
from mealie.schema.recipe.recipe_ingredient import RecipeIngredientBase
from mealie.schema.recipe.recipe_ingredient import RecipeIngredient, SaveIngredientFood, SaveIngredientUnit
from mealie.services.parser_services._base import DataMatcher
from ._migration_base import BaseMigrator
from .utils.migration_helpers import import_image
@ -25,31 +27,15 @@ def _format_time(minutes: int) -> str:
def convert_to_float(value):
try:
value = value.strip() # Remove any surrounding spaces
if value == "":
value = "1"
if " " in value: # Check for mixed fractions like "1 1/2"
# Split into whole number and fraction
whole, fraction = value.split(" ", 1)
return float(whole) + float(Fraction(fraction))
return float(Fraction(value)) # Convert fraction or whole number
except (ValueError, ZeroDivisionError):
return None # Return None for invalid values
def extract_instructions(instructions: str) -> list[str]:
"""Splits the instruction text into steps."""
return instructions.split("\n") if instructions else []
# def extract_ingredients(ingredient_data: list[dict[str, Any]]) -> list[dict[str, Any]]:
# """Extracts ingredient details from parsed Cook'n ingredient data."""
# ingredients = []
# for ingredient in ingredient_data:
# base_ingredient = RecipeIngredientBase(
# quantity=ingredient.get("AMOUNT_QTY", "1"),
# unit=ingredient.get("AMOUNT_UNIT"),
# food=ingredient.get("INGREDIENT_FOOD_ID"),
# )
# ingredients.append({"title": None, "note": base_ingredient.display})
# return ingredients
return 1.0 # Return 1.0 for invalid values
class DSVParser:
@ -93,6 +79,12 @@ class DSVParser:
return results[0]
return results
def get_data(self, row, column):
data = row.get(column, "")
if data is None or data == "[null]":
data = ""
return data
def get_table(self, table_name: str):
"""Returns the entire table as a list of dictionaries."""
if table_name not in self.tables:
@ -109,42 +101,70 @@ class CooknMigrator(BaseMigrator):
super().__init__(**kwargs)
self.name = "cookn"
self.key_aliases = []
self.matcher = DataMatcher(self.db)
def _process_recipe_document(self, _recipe_row, db) -> dict:
recipe_data = {}
def _parse_units_table(self, db):
_units_table = db.get_table("temp_unit")
for _unit_row in _units_table:
if int(db.get_data(_unit_row, "FREQUENCY")) > 0:
name = db.get_data(_unit_row, "NAME")
plural_name = db.get_data(_unit_row, "PLURAL_NAME")
abbreviation = db.get_data(_unit_row, "ABBREVIATION")
# Select db values
_recipe_id = _recipe_row["ID"]
_recipe_desc_row = db.query_by_id("temp_recipe_desc", "ID", [_recipe_id], return_first_only=True)
_chapter_id = _recipe_desc_row["PARENT"]
_chapter_row = db.query_by_id("temp_chapter_desc", "ID", [_chapter_id], return_first_only=True)
_cookbook_id = _chapter_row["PARENT"]
_cookbook_row = db.query_by_id("temp_cookBook_desc", "ID", [_cookbook_id], return_first_only=True)
_media_row = db.query_by_id("temp_media", "ENTITY_ID", [_recipe_id], return_first_only=True)
_media_id = _media_row.get("ID", "")
match = self.matcher.find_unit_match(name)
if match is None:
save = SaveIngredientUnit(
group_id=self.group.id,
name=name,
plural_name=plural_name,
abbreviation=abbreviation,
)
# Parse general recipe info
cookbook = _cookbook_row.get("TITLE", "")
chapter = _chapter_row.get("TITLE", "")
name = _recipe_desc_row.get("TITLE", "")
description = _recipe_desc_row.get("DESCRIPTION", "")
serves = _recipe_row["SERVES"]
prep_time = int(_recipe_row["PREPTIME"])
cook_time = int(_recipe_row["COOKTIME"])
try:
self.db.ingredient_units.create(save)
except Exception as e:
self.logger.error(e)
recipe_data["recipeCategory"] = [cookbook]
recipe_data["tags"] = [chapter]
recipe_data["name"] = name
recipe_data["description"] = description
recipe_data["recipeYield"] = serves
recipe_data["prepTime"] = _format_time(prep_time)
recipe_data["performTime"] = _format_time(cook_time)
recipe_data["totalTime"] = _format_time(prep_time + cook_time)
def _parse_foods_table(self, db):
_foods_table = db.get_table("temp_food")
for _food_row in _foods_table:
if int(db.get_data(_food_row, "FREQUENCY")) > 0:
name = db.get_data(_food_row, "NAME")
plural_name = db.get_data(_food_row, "PLURAL_NAME")
# Parse and rename image
match = self.matcher.find_food_match(name)
if match is None:
save = SaveIngredientFood(
group_id=self.group.id, name=name, plural_name=plural_name, description=""
)
try:
self.db.ingredient_foods.create(save)
except Exception as e:
self.logger.error(e)
def _parse_media(self, _cookbook_id, _chapter_id, _recipe_id, db):
_media_recipe_row = db.query_by_id("temp_media", "ENTITY_ID", [_recipe_id], return_first_only=True)
_media_chapter_row = db.query_by_id("temp_media", "ENTITY_ID", [_chapter_id], return_first_only=True)
_media_cookbook_row = db.query_by_id("temp_media", "ENTITY_ID", [_cookbook_id], return_first_only=True)
# Get recipe image
_media_row = _media_recipe_row
_media_id = db.get_data(_media_row, "ID")
if _media_id == "":
# Get chaper image if no recipe image
_media_row = _media_chapter_row
_media_id = db.get_data(_media_row, "ID")
if _media_id == "":
# Get cookbook image if no chapter image
_media_row = _media_cookbook_row
_media_id = db.get_data(_media_row, "ID")
# If we found an image
if _media_id != "":
_media_type = _media_row["MEDIA_CONTENT_TYPE"]
_media_type = db.get_data(_media_row, "MEDIA_CONTENT_TYPE")
# If the file has no extention add one (this is the normal case)
if Path(str(_media_id)).suffix == "":
if _media_type != "":
# Determine file extension based on media type
_extension = _media_type.split("/")[-1]
_old_image_path = os.path.join(db.directory, str(_media_id))
@ -153,87 +173,206 @@ class CooknMigrator(BaseMigrator):
if os.path.exists(_old_image_path) and not os.path.exists(new_image_path):
os.rename(_old_image_path, new_image_path)
if Path(new_image_path).exists():
recipe_data["image"] = [new_image_path]
return new_image_path
else:
return os.path.join(db.directory, str(_media_id))
return None
def _parse_ingrediants(self, _recipe_id, db):
# Parse ingrediants
ingredients = []
ingrediants_order = []
_ingrediant_rows = db.query_by_id("temp_ingredient", "PARENT_ID", [_recipe_id])
for _ingrediant_row in _ingrediant_rows:
_unit_id = _ingrediant_row.get("AMOUNT_UNIT", "")
_unit_id = db.get_data(_ingrediant_row, "AMOUNT_UNIT")
_unit_row = db.query_by_id("temp_unit", "ID", [_unit_id], return_first_only=True)
_food_id = _ingrediant_row.get("INGREDIENT_FOOD_ID", "")
_food_id = db.get_data(_ingrediant_row, "INGREDIENT_FOOD_ID")
_food_row = db.query_by_id("temp_food", "ID", [_food_id], return_first_only=True)
_brand_id = _ingrediant_row.get("BRAND_ID", "")
_brand_id = db.get_data(_ingrediant_row, "BRAND_ID")
_brand_row = db.query_by_id("temp_brand", "ID", [_brand_id], return_first_only=True)
amount = convert_to_float(_ingrediant_row.get("AMOUNT_QTY_STRING", "1"))
unit = _unit_row.get("ABBREVIATION")
amount = convert_to_float(db.get_data(_ingrediant_row, "AMOUNT_QTY_STRING"))
unit_name = db.get_data(_unit_row, "NAME")
food_name = db.get_data(_food_row, "NAME")
food_name_singluar = _food_row.get("NAME", "")
food_name_plural = _food_row.get("PLURAL_NAME", "")
if food_name_singluar != "" and food_name_plural != "":
if unit is None:
if amount is not None and amount > 1:
food_name = _food_row.get("PLURAL_NAME", "")
else:
food_name = _food_row.get("NAME", "")
else:
food_name = _food_row.get("NAME", "")
else:
if food_name_singluar != "":
food_name = food_name_singluar
else:
food_name = food_name_plural
unit = self.matcher.find_unit_match(unit_name)
if unit == "":
unit = None
food = self.matcher.find_food_match(food_name)
if food == "":
food = None
if unit is None:
unit = ""
else:
if amount is not None and amount > 1:
unit = _unit_row.get("PLURAL_NAME")
else:
unit = _unit_row.get("NAME")
pre_qualifier = db.get_data(_ingrediant_row, "PRE_QUALIFIER").rstrip()
post_qualifier = db.get_data(_ingrediant_row, "POST_QUALIFIER").rstrip()
brand = db.get_data(_brand_row, "NAME")
pre_qualifier = _ingrediant_row.get("PRE_QUALIFIER", "")
if pre_qualifier == "[null]":
pre_qualifier = ""
post_qualifier = _ingrediant_row.get("POST_QUALIFIER", "")
if post_qualifier == "[null]":
post_qualifier = ""
brand = _brand_row.get("NAME", "")
if brand == "[null]":
brand = ""
note = ""
if pre_qualifier != "":
if pre_qualifier[-1] == ",":
pre_qualifier = pre_qualifier[:-1]
note += pre_qualifier
if post_qualifier != "":
if pre_qualifier != "":
note += ", "
if post_qualifier[-1] == ",":
post_qualifier = post_qualifier[:-1]
note += post_qualifier
base_ingredient = RecipeIngredientBase(
base_ingredient = RecipeIngredient(
quantity=amount,
unit=unit,
food=pre_qualifier + " " + food_name + " " + post_qualifier + " " + brand,
notes=None,
food=food,
note=note,
original_text=f"{amount} {unit_name} {pre_qualifier} {food_name} {post_qualifier} {brand}",
disable_amount=False,
)
_display_order = int(_ingrediant_row.get("DISPLAY_ORDER", ""))
ingredients.append({"title": None, "order": _display_order, "note": base_ingredient.display})
ingredients = sorted(ingredients, key=lambda d: d["order"])
recipe_data["recipeIngredient"] = ingredients
try:
_display_order = db.get_data(_ingrediant_row, "DISPLAY_ORDER")
ingrediants_order.append(int(_display_order))
ingredients.append(base_ingredient)
except ValueError:
self.logger.warning("Invalid ingrediant order: %s, %s", _display_order, base_ingredient.original_text)
continue
return [obj for _, obj in sorted(zip(ingrediants_order, ingredients, strict=False))]
def _parse_instructions(self, instructions: str) -> list[str]:
"""
Parses recipe instructions into a list of steps.
Detects numbered lists, bulleted lists, and plain new-line-separated steps.
"""
# Detects numbered lists (1., 1), 1-, etc.) and bulleted lists (-, *, •)
numbered_pattern = re.compile(r"^(\d+)[.)-]\s*(.*)")
bullet_pattern = re.compile(r"^[\-*•]\s*(.*)")
lines = instructions.splitlines()
steps = []
current_step = []
for line in lines:
line = line.strip()
if not line:
continue # Skip empty lines
num_match = numbered_pattern.match(line)
bullet_match = bullet_pattern.match(line)
if num_match:
# If there's a current step, store it before starting a new one
if current_step:
steps.append("\n".join(current_step))
current_step = []
current_step.append(num_match.group(2))
elif bullet_match:
if current_step:
steps.append("\n".join(current_step))
current_step = []
current_step.append(bullet_match.group(1))
else:
# Continuation of a previous step
if current_step:
current_step.append(line)
else:
# If no clear separator is found, treat each new line as a new step
steps.append(line)
if current_step:
steps.append(" ".join(current_step))
return steps
def _process_recipe_document(self, _recipe_row, db) -> dict:
recipe_data = {}
# Select db values
_recipe_id = db.get_data(_recipe_row, "ID")
_recipe_desc_row = db.query_by_id("temp_recipe_desc", "ID", [_recipe_id], return_first_only=True)
_chapter_id = db.get_data(_recipe_desc_row, "PARENT")
_chapter_row = db.query_by_id("temp_chapter_desc", "ID", [_chapter_id], return_first_only=True)
_cookbook_id = db.get_data(_chapter_row, "PARENT")
_cookbook_row = db.query_by_id("temp_cookBook_desc", "ID", [_cookbook_id], return_first_only=True)
_media_row = db.query_by_id("temp_media", "ENTITY_ID", [_recipe_id], return_first_only=True)
# Parse general recipe info
cookbook = db.get_data(_cookbook_row, "TITLE")
chapter = db.get_data(_chapter_row, "TITLE")
name = db.get_data(_recipe_desc_row, "TITLE")
description = db.get_data(_recipe_desc_row, "DESCRIPTION")
serves = db.get_data(_recipe_row, "SERVES")
# yields = db.get_data(_recipe_row, "YIELD")
prep_time = int(db.get_data(_recipe_row, "PREPTIME"))
cook_time = int(db.get_data(_recipe_row, "COOKTIME"))
recipe_data["recipeCategory"] = [cookbook + " - " + chapter]
recipe_data["name"] = name
recipe_data["description"] = description
recipe_data["recipeYield"] = serves
recipe_data["prepTime"] = _format_time(prep_time)
recipe_data["performTime"] = _format_time(cook_time)
recipe_data["totalTime"] = _format_time(prep_time + cook_time)
# Parse image file
image_path = self._parse_media(_cookbook_id, _chapter_id, _recipe_id, db)
if import_image is not None:
recipe_data["image"] = [image_path]
# Parse ingrediants
recipe_data["ingrediants"] = self._parse_ingrediants(_recipe_id, db)
# Parse instructions
recipe_data["recipeInstructions"] = extract_instructions(_recipe_row["INSTRUCTIONS"])
recipe_data["recipeInstructions"] = self._parse_instructions(db.get_data(_recipe_row, "INSTRUCTIONS"))
return recipe_data
def _process_cookbook(self, path):
source_dir = self.get_zip_base_path(path)
db = DSVParser(source_dir)
self._parse_units_table(db)
self._parse_foods_table(db)
# Reload matcher with updated tables
self.matcher = DataMatcher(self.db)
_recipe_table = db.get_table("temp_recipe")
recipes_as_dicts = [self._process_recipe_document(_recipe_row, db) for _recipe_row in _recipe_table]
recipes = []
for r in recipes_as_dicts:
ingrediants = r["ingrediants"]
r = self.clean_recipe_dictionary(r)
r.recipe_ingredient = ingrediants
recipes.append(r)
results = self.import_recipes_to_database(recipes)
# If I use the slugs from the results rather than the recipe lookup, I can properly import images
recipe_lookup = {r.slug: r for r in recipes}
for slug, recipe_id, status in results:
if status:
r = recipe_lookup.get(slug)
if r:
if r.image:
import_image(r.image, recipe_id)
else:
index_len = len(slug.split("-")[-1])
r = recipe_lookup.get(slug[: -(index_len + 1)])
self.logger.warning("Duplicate recipe (%s) found! Saved as copy...", r.name)
if r.image:
import_image(r.image, recipe_id)
def _migrate(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
with zipfile.ZipFile(self.archive) as zip_file:
zip_file.extractall(tmpdir)
source_dir = self.get_zip_base_path(Path(tmpdir))
db = DSVParser(source_dir)
_recipe_table = db.get_table("temp_recipe")
recipes_as_dicts = [self._process_recipe_document(_recipe_row, db) for _recipe_row in _recipe_table]
# Process single zipped cookbook
if Path(f"{tmpdir}/temp_recipe.dsv").exists():
self._process_cookbook(Path(tmpdir))
recipes = [self.clean_recipe_dictionary(x) for x in recipes_as_dicts]
results = self.import_recipes_to_database(recipes)
recipe_lookup = {r.slug: r for r in recipes}
for slug, recipe_id, status in results:
if status:
r = recipe_lookup.get(slug)
if r and r.image:
import_image(r.image, recipe_id)
# Process a zip folder of zipped cookbooks
for file in Path(tmpdir).glob("*.zip"):
with tempfile.TemporaryDirectory() as tmpdir2:
with zipfile.ZipFile(file) as zip_file2:
zip_file2.extractall(tmpdir2)
self._process_cookbook(Path(tmpdir2))