diff --git a/mealie/services/migrations/cookn.py b/mealie/services/migrations/cookn.py index f88c8e8b7..eb4c46f32 100644 --- a/mealie/services/migrations/cookn.py +++ b/mealie/services/migrations/cookn.py @@ -1,11 +1,13 @@ import os +import re import tempfile import zipfile from fractions import Fraction from pathlib import Path from typing import Any -from mealie.schema.recipe.recipe_ingredient import RecipeIngredientBase +from mealie.schema.recipe.recipe_ingredient import RecipeIngredient, SaveIngredientFood, SaveIngredientUnit +from mealie.services.parser_services._base import DataMatcher from ._migration_base import BaseMigrator from .utils.migration_helpers import import_image @@ -25,31 +27,15 @@ def _format_time(minutes: int) -> str: def convert_to_float(value): try: value = value.strip() # Remove any surrounding spaces + if value == "": + value = "1" if " " in value: # Check for mixed fractions like "1 1/2" # Split into whole number and fraction whole, fraction = value.split(" ", 1) return float(whole) + float(Fraction(fraction)) return float(Fraction(value)) # Convert fraction or whole number except (ValueError, ZeroDivisionError): - return None # Return None for invalid values - - -def extract_instructions(instructions: str) -> list[str]: - """Splits the instruction text into steps.""" - return instructions.split("\n") if instructions else [] - - -# def extract_ingredients(ingredient_data: list[dict[str, Any]]) -> list[dict[str, Any]]: -# """Extracts ingredient details from parsed Cook'n ingredient data.""" -# ingredients = [] -# for ingredient in ingredient_data: -# base_ingredient = RecipeIngredientBase( -# quantity=ingredient.get("AMOUNT_QTY", "1"), -# unit=ingredient.get("AMOUNT_UNIT"), -# food=ingredient.get("INGREDIENT_FOOD_ID"), -# ) -# ingredients.append({"title": None, "note": base_ingredient.display}) -# return ingredients + return 1.0 # Return 1.0 for invalid values class DSVParser: @@ -93,6 +79,12 @@ class DSVParser: return results[0] return results + def get_data(self, row, column): + data = row.get(column, "") + if data is None or data == "[null]": + data = "" + return data + def get_table(self, table_name: str): """Returns the entire table as a list of dictionaries.""" if table_name not in self.tables: @@ -109,31 +101,211 @@ class CooknMigrator(BaseMigrator): super().__init__(**kwargs) self.name = "cookn" self.key_aliases = [] + self.matcher = DataMatcher(self.db) + + def _parse_units_table(self, db): + _units_table = db.get_table("temp_unit") + for _unit_row in _units_table: + if int(db.get_data(_unit_row, "FREQUENCY")) > 0: + name = db.get_data(_unit_row, "NAME") + plural_name = db.get_data(_unit_row, "PLURAL_NAME") + abbreviation = db.get_data(_unit_row, "ABBREVIATION") + + match = self.matcher.find_unit_match(name) + if match is None: + save = SaveIngredientUnit( + group_id=self.group.id, + name=name, + plural_name=plural_name, + abbreviation=abbreviation, + ) + + try: + self.db.ingredient_units.create(save) + except Exception as e: + self.logger.error(e) + + def _parse_foods_table(self, db): + _foods_table = db.get_table("temp_food") + for _food_row in _foods_table: + if int(db.get_data(_food_row, "FREQUENCY")) > 0: + name = db.get_data(_food_row, "NAME") + plural_name = db.get_data(_food_row, "PLURAL_NAME") + + match = self.matcher.find_food_match(name) + if match is None: + save = SaveIngredientFood( + group_id=self.group.id, name=name, plural_name=plural_name, description="" + ) + try: + self.db.ingredient_foods.create(save) + except Exception as e: + self.logger.error(e) + + def _parse_media(self, _cookbook_id, _chapter_id, _recipe_id, db): + _media_recipe_row = db.query_by_id("temp_media", "ENTITY_ID", [_recipe_id], return_first_only=True) + _media_chapter_row = db.query_by_id("temp_media", "ENTITY_ID", [_chapter_id], return_first_only=True) + _media_cookbook_row = db.query_by_id("temp_media", "ENTITY_ID", [_cookbook_id], return_first_only=True) + + # Get recipe image + _media_row = _media_recipe_row + _media_id = db.get_data(_media_row, "ID") + if _media_id == "": + # Get chaper image if no recipe image + _media_row = _media_chapter_row + _media_id = db.get_data(_media_row, "ID") + if _media_id == "": + # Get cookbook image if no chapter image + _media_row = _media_cookbook_row + _media_id = db.get_data(_media_row, "ID") + + # If we found an image + if _media_id != "": + _media_type = db.get_data(_media_row, "MEDIA_CONTENT_TYPE") + # If the file has no extention add one (this is the normal case) + if Path(str(_media_id)).suffix == "": + if _media_type != "": + # Determine file extension based on media type + _extension = _media_type.split("/")[-1] + _old_image_path = os.path.join(db.directory, str(_media_id)) + new_image_path = f"{_old_image_path}.{_extension}" + # Rename the file if it exists and has no extension + if os.path.exists(_old_image_path) and not os.path.exists(new_image_path): + os.rename(_old_image_path, new_image_path) + if Path(new_image_path).exists(): + return new_image_path + else: + return os.path.join(db.directory, str(_media_id)) + return None + + def _parse_ingrediants(self, _recipe_id, db): + # Parse ingrediants + ingredients = [] + ingrediants_order = [] + _ingrediant_rows = db.query_by_id("temp_ingredient", "PARENT_ID", [_recipe_id]) + for _ingrediant_row in _ingrediant_rows: + _unit_id = db.get_data(_ingrediant_row, "AMOUNT_UNIT") + _unit_row = db.query_by_id("temp_unit", "ID", [_unit_id], return_first_only=True) + _food_id = db.get_data(_ingrediant_row, "INGREDIENT_FOOD_ID") + _food_row = db.query_by_id("temp_food", "ID", [_food_id], return_first_only=True) + _brand_id = db.get_data(_ingrediant_row, "BRAND_ID") + _brand_row = db.query_by_id("temp_brand", "ID", [_brand_id], return_first_only=True) + + amount = convert_to_float(db.get_data(_ingrediant_row, "AMOUNT_QTY_STRING")) + unit_name = db.get_data(_unit_row, "NAME") + food_name = db.get_data(_food_row, "NAME") + + unit = self.matcher.find_unit_match(unit_name) + if unit == "": + unit = None + food = self.matcher.find_food_match(food_name) + if food == "": + food = None + + pre_qualifier = db.get_data(_ingrediant_row, "PRE_QUALIFIER").rstrip() + post_qualifier = db.get_data(_ingrediant_row, "POST_QUALIFIER").rstrip() + brand = db.get_data(_brand_row, "NAME") + + note = "" + if pre_qualifier != "": + if pre_qualifier[-1] == ",": + pre_qualifier = pre_qualifier[:-1] + note += pre_qualifier + if post_qualifier != "": + if pre_qualifier != "": + note += ", " + if post_qualifier[-1] == ",": + post_qualifier = post_qualifier[:-1] + note += post_qualifier + + base_ingredient = RecipeIngredient( + quantity=amount, + unit=unit, + food=food, + note=note, + original_text=f"{amount} {unit_name} {pre_qualifier} {food_name} {post_qualifier} {brand}", + disable_amount=False, + ) + try: + _display_order = db.get_data(_ingrediant_row, "DISPLAY_ORDER") + ingrediants_order.append(int(_display_order)) + ingredients.append(base_ingredient) + except ValueError: + self.logger.warning("Invalid ingrediant order: %s, %s", _display_order, base_ingredient.original_text) + continue + return [obj for _, obj in sorted(zip(ingrediants_order, ingredients, strict=False))] + + def _parse_instructions(self, instructions: str) -> list[str]: + """ + Parses recipe instructions into a list of steps. + Detects numbered lists, bulleted lists, and plain new-line-separated steps. + """ + # Detects numbered lists (1., 1), 1-, etc.) and bulleted lists (-, *, •) + numbered_pattern = re.compile(r"^(\d+)[.)-]\s*(.*)") + bullet_pattern = re.compile(r"^[\-*•]\s*(.*)") + + lines = instructions.splitlines() + steps = [] + current_step = [] + + for line in lines: + line = line.strip() + + if not line: + continue # Skip empty lines + + num_match = numbered_pattern.match(line) + bullet_match = bullet_pattern.match(line) + + if num_match: + # If there's a current step, store it before starting a new one + if current_step: + steps.append("\n".join(current_step)) + current_step = [] + + current_step.append(num_match.group(2)) + elif bullet_match: + if current_step: + steps.append("\n".join(current_step)) + current_step = [] + + current_step.append(bullet_match.group(1)) + else: + # Continuation of a previous step + if current_step: + current_step.append(line) + else: + # If no clear separator is found, treat each new line as a new step + steps.append(line) + + if current_step: + steps.append(" ".join(current_step)) + + return steps def _process_recipe_document(self, _recipe_row, db) -> dict: recipe_data = {} # Select db values - _recipe_id = _recipe_row["ID"] + _recipe_id = db.get_data(_recipe_row, "ID") _recipe_desc_row = db.query_by_id("temp_recipe_desc", "ID", [_recipe_id], return_first_only=True) - _chapter_id = _recipe_desc_row["PARENT"] + _chapter_id = db.get_data(_recipe_desc_row, "PARENT") _chapter_row = db.query_by_id("temp_chapter_desc", "ID", [_chapter_id], return_first_only=True) - _cookbook_id = _chapter_row["PARENT"] + _cookbook_id = db.get_data(_chapter_row, "PARENT") _cookbook_row = db.query_by_id("temp_cookBook_desc", "ID", [_cookbook_id], return_first_only=True) _media_row = db.query_by_id("temp_media", "ENTITY_ID", [_recipe_id], return_first_only=True) - _media_id = _media_row.get("ID", "") # Parse general recipe info - cookbook = _cookbook_row.get("TITLE", "") - chapter = _chapter_row.get("TITLE", "") - name = _recipe_desc_row.get("TITLE", "") - description = _recipe_desc_row.get("DESCRIPTION", "") - serves = _recipe_row["SERVES"] - prep_time = int(_recipe_row["PREPTIME"]) - cook_time = int(_recipe_row["COOKTIME"]) + cookbook = db.get_data(_cookbook_row, "TITLE") + chapter = db.get_data(_chapter_row, "TITLE") + name = db.get_data(_recipe_desc_row, "TITLE") + description = db.get_data(_recipe_desc_row, "DESCRIPTION") + serves = db.get_data(_recipe_row, "SERVES") + # yields = db.get_data(_recipe_row, "YIELD") + prep_time = int(db.get_data(_recipe_row, "PREPTIME")) + cook_time = int(db.get_data(_recipe_row, "COOKTIME")) - recipe_data["recipeCategory"] = [cookbook] - recipe_data["tags"] = [chapter] + recipe_data["recipeCategory"] = [cookbook + " - " + chapter] recipe_data["name"] = name recipe_data["description"] = description recipe_data["recipeYield"] = serves @@ -141,99 +313,66 @@ class CooknMigrator(BaseMigrator): recipe_data["performTime"] = _format_time(cook_time) recipe_data["totalTime"] = _format_time(prep_time + cook_time) - # Parse and rename image - - if _media_id != "": - _media_type = _media_row["MEDIA_CONTENT_TYPE"] - # Determine file extension based on media type - _extension = _media_type.split("/")[-1] - _old_image_path = os.path.join(db.directory, str(_media_id)) - new_image_path = f"{_old_image_path}.{_extension}" - # Rename the file if it exists and has no extension - if os.path.exists(_old_image_path) and not os.path.exists(new_image_path): - os.rename(_old_image_path, new_image_path) - if Path(new_image_path).exists(): - recipe_data["image"] = [new_image_path] + # Parse image file + image_path = self._parse_media(_cookbook_id, _chapter_id, _recipe_id, db) + if import_image is not None: + recipe_data["image"] = [image_path] # Parse ingrediants - ingredients = [] - _ingrediant_rows = db.query_by_id("temp_ingredient", "PARENT_ID", [_recipe_id]) - for _ingrediant_row in _ingrediant_rows: - _unit_id = _ingrediant_row.get("AMOUNT_UNIT", "") - _unit_row = db.query_by_id("temp_unit", "ID", [_unit_id], return_first_only=True) - _food_id = _ingrediant_row.get("INGREDIENT_FOOD_ID", "") - _food_row = db.query_by_id("temp_food", "ID", [_food_id], return_first_only=True) - _brand_id = _ingrediant_row.get("BRAND_ID", "") - _brand_row = db.query_by_id("temp_brand", "ID", [_brand_id], return_first_only=True) - - amount = convert_to_float(_ingrediant_row.get("AMOUNT_QTY_STRING", "1")) - unit = _unit_row.get("ABBREVIATION") - - food_name_singluar = _food_row.get("NAME", "") - food_name_plural = _food_row.get("PLURAL_NAME", "") - if food_name_singluar != "" and food_name_plural != "": - if unit is None: - if amount is not None and amount > 1: - food_name = _food_row.get("PLURAL_NAME", "") - else: - food_name = _food_row.get("NAME", "") - else: - food_name = _food_row.get("NAME", "") - else: - if food_name_singluar != "": - food_name = food_name_singluar - else: - food_name = food_name_plural - - if unit is None: - unit = "" - else: - if amount is not None and amount > 1: - unit = _unit_row.get("PLURAL_NAME") - else: - unit = _unit_row.get("NAME") - - pre_qualifier = _ingrediant_row.get("PRE_QUALIFIER", "") - if pre_qualifier == "[null]": - pre_qualifier = "" - post_qualifier = _ingrediant_row.get("POST_QUALIFIER", "") - if post_qualifier == "[null]": - post_qualifier = "" - brand = _brand_row.get("NAME", "") - if brand == "[null]": - brand = "" - - base_ingredient = RecipeIngredientBase( - quantity=amount, - unit=unit, - food=pre_qualifier + " " + food_name + " " + post_qualifier + " " + brand, - notes=None, - ) - _display_order = int(_ingrediant_row.get("DISPLAY_ORDER", "")) - ingredients.append({"title": None, "order": _display_order, "note": base_ingredient.display}) - ingredients = sorted(ingredients, key=lambda d: d["order"]) - recipe_data["recipeIngredient"] = ingredients + recipe_data["ingrediants"] = self._parse_ingrediants(_recipe_id, db) # Parse instructions - recipe_data["recipeInstructions"] = extract_instructions(_recipe_row["INSTRUCTIONS"]) + recipe_data["recipeInstructions"] = self._parse_instructions(db.get_data(_recipe_row, "INSTRUCTIONS")) return recipe_data + def _process_cookbook(self, path): + source_dir = self.get_zip_base_path(path) + db = DSVParser(source_dir) + self._parse_units_table(db) + self._parse_foods_table(db) + # Reload matcher with updated tables + self.matcher = DataMatcher(self.db) + + _recipe_table = db.get_table("temp_recipe") + recipes_as_dicts = [self._process_recipe_document(_recipe_row, db) for _recipe_row in _recipe_table] + + recipes = [] + for r in recipes_as_dicts: + ingrediants = r["ingrediants"] + r = self.clean_recipe_dictionary(r) + r.recipe_ingredient = ingrediants + recipes.append(r) + + results = self.import_recipes_to_database(recipes) + # If I use the slugs from the results rather than the recipe lookup, I can properly import images + recipe_lookup = {r.slug: r for r in recipes} + for slug, recipe_id, status in results: + if status: + r = recipe_lookup.get(slug) + if r: + if r.image: + import_image(r.image, recipe_id) + else: + index_len = len(slug.split("-")[-1]) + r = recipe_lookup.get(slug[: -(index_len + 1)]) + self.logger.warning("Duplicate recipe (%s) found! Saved as copy...", r.name) + if r.image: + import_image(r.image, recipe_id) + def _migrate(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: with zipfile.ZipFile(self.archive) as zip_file: zip_file.extractall(tmpdir) - source_dir = self.get_zip_base_path(Path(tmpdir)) - db = DSVParser(source_dir) - _recipe_table = db.get_table("temp_recipe") - recipes_as_dicts = [self._process_recipe_document(_recipe_row, db) for _recipe_row in _recipe_table] + # Process single zipped cookbook + if Path(f"{tmpdir}/temp_recipe.dsv").exists(): + self._process_cookbook(Path(tmpdir)) - recipes = [self.clean_recipe_dictionary(x) for x in recipes_as_dicts] - results = self.import_recipes_to_database(recipes) - recipe_lookup = {r.slug: r for r in recipes} - for slug, recipe_id, status in results: - if status: - r = recipe_lookup.get(slug) - if r and r.image: - import_image(r.image, recipe_id) + # Process a zip folder of zipped cookbooks + for file in Path(tmpdir).glob("*.zip"): + with tempfile.TemporaryDirectory() as tmpdir2: + with zipfile.ZipFile(file) as zip_file2: + zip_file2.extractall(tmpdir2) + + self._process_cookbook(Path(tmpdir2))