mirror of
https://github.com/hay-kot/mealie.git
synced 2025-08-22 06:23:34 -07:00
convert chowdown to new methods
This commit is contained in:
parent
9b935a5d1b
commit
423b264e4f
9 changed files with 114 additions and 252 deletions
File diff suppressed because one or more lines are too long
|
@ -9,8 +9,6 @@ from mealie.routes.deps import get_current_user
|
|||
from mealie.schema.migration import MigrationFile, Migrations
|
||||
from mealie.schema.snackbar import SnackResponse
|
||||
from mealie.services.migrations import migration
|
||||
from mealie.services.migrations.chowdown import chowdown_migrate as chowdow_migrate
|
||||
from mealie.services.migrations.nextcloud import migrate as nextcloud_migrate
|
||||
from sqlalchemy.orm.session import Session
|
||||
|
||||
router = APIRouter(prefix="/api/migrations", tags=["Migration"], dependencies=[Depends(get_current_user)])
|
||||
|
@ -38,20 +36,10 @@ def get_all_migration_options():
|
|||
|
||||
|
||||
@router.post("/{import_type}/{file_name}/import")
|
||||
def import_nextcloud_directory(
|
||||
import_type: migration.Migration, file_name: str, session: Session = Depends(generate_session)
|
||||
):
|
||||
def import_migration(import_type: migration.Migration, file_name: str, session: Session = Depends(generate_session)):
|
||||
""" Imports all the recipes in a given directory """
|
||||
file_path = app_dirs.MIGRATION_DIR.joinpath(import_type.value, file_name)
|
||||
migration.migrate(import_type, file_path, session)
|
||||
|
||||
# file_path = app_dirs.MIGRATION_DIR.joinpath(type, file_name)
|
||||
# if type == "nextcloud":
|
||||
# return nextcloud_migrate(session, file_path)
|
||||
# elif type == "chowdown":
|
||||
# return chowdow_migrate(session, file_path)
|
||||
# else:
|
||||
# return SnackResponse.error("Incorrect Migration Type Selected")
|
||||
return migration.migrate(import_type, file_path, session)
|
||||
|
||||
|
||||
@router.delete("/{import_type}/{file_name}/delete")
|
||||
|
|
|
@ -57,7 +57,7 @@ def write_image(recipe_slug: str, file_data: bytes, extension: str) -> Path.name
|
|||
pass
|
||||
|
||||
image_dir = Path(app_dirs.IMG_DIR.joinpath(f"{recipe_slug}"))
|
||||
image_dir.mkdir()
|
||||
image_dir.mkdir(exist_ok=True, parents=True)
|
||||
extension = extension.replace(".", "")
|
||||
image_path = image_dir.joinpath(f"original.{extension}")
|
||||
|
||||
|
|
|
@ -61,20 +61,25 @@ class MigrationBase(BaseModel):
|
|||
yaml_file (Path): Path to yaml file
|
||||
|
||||
Returns:
|
||||
dict: Contains keys "recipe_data" and optional "description"
|
||||
dict: representing the yaml file as a dictionary
|
||||
"""
|
||||
with open(yaml_file, "r") as stream:
|
||||
try:
|
||||
for x, item in enumerate(yaml.load_all(stream, Loader=Loader)):
|
||||
if x == 0:
|
||||
recipe_data = item
|
||||
elif x == 1:
|
||||
recipe_description = str(item)
|
||||
with open(yaml_file, "r") as f:
|
||||
contents = f.read().split("---")
|
||||
recipe_data = {}
|
||||
for x, document in enumerate(contents):
|
||||
|
||||
except yaml.YAMLError:
|
||||
return
|
||||
# Check if None or Empty String
|
||||
if document is None or document == "":
|
||||
continue
|
||||
|
||||
return {"recipe_data": recipe_data, "description": recipe_description or None}
|
||||
# Check if 'title:' present
|
||||
elif "title:" in document:
|
||||
recipe_data.update(yaml.safe_load(document))
|
||||
|
||||
else:
|
||||
recipe_data["description"] = document
|
||||
|
||||
return recipe_data
|
||||
|
||||
@staticmethod
|
||||
def glob_walker(directory: Path, glob_str: str, return_parent=True) -> list[Path]: # TODO:
|
||||
|
|
|
@ -1,93 +1,54 @@
|
|||
import shutil
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import yaml
|
||||
from fastapi.logger import logger
|
||||
from mealie.core.config import app_dirs
|
||||
from mealie.db.database import db
|
||||
from mealie.schema.recipe import Recipe
|
||||
from mealie.services.image.minify import migrate_images
|
||||
from mealie.utils.unzip import unpack_zip
|
||||
from mealie.schema.migration import MigrationImport
|
||||
from mealie.services.migrations._migration_base import (MigrationAlias,
|
||||
MigrationBase)
|
||||
from sqlalchemy.orm.session import Session
|
||||
|
||||
try:
|
||||
from yaml import CLoader as Loader
|
||||
except ImportError:
|
||||
from yaml import Loader
|
||||
|
||||
def process_tags(all_tags):
|
||||
return [x.title() for x in all_tags.split(",")]
|
||||
|
||||
|
||||
def read_chowdown_file(recipe_file: Path) -> Recipe:
|
||||
"""Parse through the yaml file to try and pull out the relavent information.
|
||||
Some issues occur when ":" are used in the text. I have not put a lot of effort
|
||||
into this so there may be better ways of going about it. Currently, I get about 80-90%
|
||||
of recipes from repos I've tried.
|
||||
|
||||
Args:
|
||||
recipe_file (Path): Path to the yaml file
|
||||
|
||||
Returns:
|
||||
Recipe: Recipe class object
|
||||
"""
|
||||
|
||||
with open(recipe_file, "r") as stream:
|
||||
recipe_description: str = str
|
||||
recipe_data: dict = {}
|
||||
try:
|
||||
for x, item in enumerate(yaml.load_all(stream, Loader=Loader)):
|
||||
if x == 0:
|
||||
recipe_data = item
|
||||
elif x == 1:
|
||||
recipe_description = str(item)
|
||||
|
||||
except yaml.YAMLError:
|
||||
return
|
||||
|
||||
reformat_data = {
|
||||
"name": recipe_data.get("title"),
|
||||
"description": recipe_description,
|
||||
"image": recipe_data.get("image", ""),
|
||||
"recipeIngredient": recipe_data.get("ingredients"),
|
||||
"recipeInstructions": recipe_data.get("directions"),
|
||||
"tags": recipe_data.get("tags").split(","),
|
||||
}
|
||||
|
||||
reformated_list = [{"text": instruction} for instruction in reformat_data["recipeInstructions"]]
|
||||
|
||||
reformat_data["recipeInstructions"] = reformated_list
|
||||
|
||||
return Recipe(**reformat_data)
|
||||
def process_instructions(all_instructions):
|
||||
return [{"text": instruction} for instruction in all_instructions]
|
||||
|
||||
|
||||
def chowdown_migrate(session: Session, zip_file: Path):
|
||||
class ChowdownMigration(MigrationBase):
|
||||
key_aliases: Optional[list[MigrationAlias]] = [
|
||||
MigrationAlias(key="name", alias="title", func=None),
|
||||
MigrationAlias(key="recipeIngredient", alias="ingredients", func=None),
|
||||
MigrationAlias(key="recipeInstructions", alias="directions", func=process_instructions),
|
||||
MigrationAlias(key="tags", alias="tags", func=process_tags),
|
||||
]
|
||||
|
||||
temp_dir = unpack_zip(zip_file)
|
||||
|
||||
with temp_dir as dir:
|
||||
def migrate(session: Session, zip_path: Path) -> list[MigrationImport]:
|
||||
cd_migration = ChowdownMigration(migration_file=zip_path, session=session)
|
||||
|
||||
with cd_migration.temp_dir as dir:
|
||||
chow_dir = next(Path(dir).iterdir())
|
||||
image_dir = app_dirs.TEMP_DIR.joinpath(chow_dir, "images")
|
||||
recipe_dir = app_dirs.TEMP_DIR.joinpath(chow_dir, "_recipes")
|
||||
|
||||
failed_recipes = []
|
||||
successful_recipes = []
|
||||
for recipe in recipe_dir.glob("*.md"):
|
||||
try:
|
||||
new_recipe = read_chowdown_file(recipe)
|
||||
db.recipes.create(session, new_recipe.dict())
|
||||
successful_recipes.append(new_recipe.name)
|
||||
except Exception as inst:
|
||||
session.rollback()
|
||||
logger.error(inst)
|
||||
failed_recipes.append(recipe.stem)
|
||||
recipes_as_dicts = [y for x in recipe_dir.glob("*.md") if (y := ChowdownMigration.yaml_reader(x)) is not None]
|
||||
|
||||
failed_images = []
|
||||
for image in image_dir.iterdir():
|
||||
try:
|
||||
if image.stem not in failed_recipes:
|
||||
shutil.copy(image, app_dirs.IMG_DIR.joinpath(image.name))
|
||||
except Exception as inst:
|
||||
logger.error(inst)
|
||||
failed_images.append(image.name)
|
||||
report = {"successful": successful_recipes, "failed": failed_recipes}
|
||||
recipes = [cd_migration.clean_recipe_dictionary(x) for x in recipes_as_dicts]
|
||||
|
||||
migrate_images()
|
||||
return report
|
||||
cd_migration.import_recipes_to_database(recipes)
|
||||
|
||||
recipe_lookup = {r.slug: r for r in recipes}
|
||||
|
||||
for report in cd_migration.migration_report:
|
||||
if report.status:
|
||||
try:
|
||||
original_image = recipe_lookup.get(report.slug).image
|
||||
cd_image = image_dir.joinpath(original_image)
|
||||
except StopIteration:
|
||||
continue
|
||||
if cd_image:
|
||||
ChowdownMigration.import_image(cd_image, report.slug)
|
||||
|
||||
return cd_migration.migration_report
|
||||
|
|
|
@ -3,7 +3,7 @@ from pathlib import Path
|
|||
|
||||
from fastapi.logger import logger
|
||||
from mealie.schema.migration import MigrationImport
|
||||
from mealie.services.migrations import chowdown, nextcloud, nextcloud_new
|
||||
from mealie.services.migrations import chowdown, chowdown, nextcloud
|
||||
from sqlalchemy.orm.session import Session
|
||||
|
||||
|
||||
|
@ -34,11 +34,14 @@ def migrate(migration_type: str, file_path: Path, session: Session) -> list[Migr
|
|||
logger.info(f"Starting Migration from {migration_type}")
|
||||
|
||||
if migration_type == Migration.nextcloud.value:
|
||||
migration_imports = nextcloud_new.migrate(session, file_path)
|
||||
migration_imports = nextcloud.migrate(session, file_path)
|
||||
|
||||
elif migration_type == Migration.chowdown.value:
|
||||
migration_imports = chowdown.chowdown_migrate(session, file_path)
|
||||
migration_imports = chowdown.migrate(session, file_path)
|
||||
|
||||
else:
|
||||
return []
|
||||
|
||||
logger.info(f"Finishing Migration from {migration_type}")
|
||||
|
||||
return None
|
||||
return migration_imports
|
||||
|
|
|
@ -1,27 +1,11 @@
|
|||
import json
|
||||
import logging
|
||||
import shutil
|
||||
import zipfile
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from mealie.core.config import app_dirs
|
||||
from mealie.db.database import db
|
||||
from mealie.schema.recipe import Recipe
|
||||
from mealie.services.image import minify
|
||||
from mealie.services.scraper.cleaner import Cleaner
|
||||
|
||||
|
||||
def process_selection(selection: Path) -> Path:
|
||||
if selection.is_dir():
|
||||
return selection
|
||||
elif selection.suffix == ".zip":
|
||||
with zipfile.ZipFile(selection, "r") as zip_ref:
|
||||
nextcloud_dir = app_dirs.TEMP_DIR.joinpath("nextcloud")
|
||||
nextcloud_dir.mkdir(exist_ok=False, parents=True)
|
||||
zip_ref.extractall(nextcloud_dir)
|
||||
return nextcloud_dir
|
||||
else:
|
||||
return None
|
||||
from mealie.schema.migration import MigrationImport
|
||||
from mealie.services.migrations._migration_base import MigrationAlias, MigrationBase
|
||||
from slugify import slugify
|
||||
from sqlalchemy.orm.session import Session
|
||||
|
||||
|
||||
def clean_nextcloud_tags(nextcloud_tags: str):
|
||||
|
@ -31,67 +15,59 @@ def clean_nextcloud_tags(nextcloud_tags: str):
|
|||
return [x.title().lstrip() for x in nextcloud_tags.split(",") if x != ""]
|
||||
|
||||
|
||||
def import_recipes(recipe_dir: Path) -> Recipe:
|
||||
image = False
|
||||
@dataclass
|
||||
class NextcloudDir:
|
||||
name: str
|
||||
recipe: dict
|
||||
image: Optional[Path]
|
||||
|
||||
for file in recipe_dir.glob("full.*"):
|
||||
image = file
|
||||
break
|
||||
@property
|
||||
def slug(self):
|
||||
return slugify(self.recipe.get("name"))
|
||||
|
||||
for file in recipe_dir.glob("*.json"):
|
||||
recipe_file = file
|
||||
break
|
||||
@classmethod
|
||||
def from_dir(cls, dir: Path):
|
||||
try:
|
||||
json_file = next(dir.glob("*.json"))
|
||||
except StopIteration:
|
||||
return None
|
||||
|
||||
with open(recipe_file, "r") as f:
|
||||
recipe_dict = json.loads(f.read())
|
||||
try: # TODO: There's got to be a better way to do this.
|
||||
image_file = next(dir.glob("full.*"))
|
||||
except StopIteration:
|
||||
image_file = None
|
||||
|
||||
recipe_data = Cleaner.clean(recipe_dict)
|
||||
|
||||
image_name = recipe_data["slug"]
|
||||
recipe_data["image"] = recipe_data["slug"]
|
||||
recipe_data["tags"] = clean_nextcloud_tags(recipe_data.get("keywords"))
|
||||
|
||||
recipe = Recipe(**recipe_data)
|
||||
|
||||
if image:
|
||||
shutil.copy(image, app_dirs.IMG_DIR.joinpath(image_name + image.suffix))
|
||||
|
||||
return recipe
|
||||
return cls(name=dir.name, recipe=NextcloudMigration.json_reader(json_file), image=image_file)
|
||||
|
||||
|
||||
def prep():
|
||||
shutil.rmtree(app_dirs.TEMP_DIR, ignore_errors=True)
|
||||
app_dirs.TEMP_DIR.mkdir(exist_ok=True, parents=True)
|
||||
class NextcloudMigration(MigrationBase):
|
||||
key_aliases: Optional[list[MigrationAlias]] = [
|
||||
MigrationAlias(key="tags", alias="keywords", func=clean_nextcloud_tags)
|
||||
]
|
||||
|
||||
|
||||
def cleanup():
|
||||
shutil.rmtree(app_dirs.TEMP_DIR)
|
||||
def migrate(session: Session, zip_path: Path) -> list[MigrationImport]:
|
||||
|
||||
nc_migration = NextcloudMigration(migration_file=zip_path, session=session)
|
||||
|
||||
def migrate(session, selection: str):
|
||||
prep()
|
||||
app_dirs.MIGRATION_DIR.mkdir(exist_ok=True)
|
||||
selection = app_dirs.MIGRATION_DIR.joinpath(selection)
|
||||
with nc_migration.temp_dir as dir:
|
||||
potential_recipe_dirs = NextcloudMigration.glob_walker(dir, glob_str="**/[!.]*.json", return_parent=True)
|
||||
|
||||
nextcloud_dir = process_selection(selection)
|
||||
nextcloud_dirs = [NextcloudDir.from_dir(x) for x in potential_recipe_dirs]
|
||||
nextcloud_dirs = {x.slug: x for x in nextcloud_dirs}
|
||||
|
||||
successful_imports = []
|
||||
failed_imports = []
|
||||
for dir in nextcloud_dir.iterdir():
|
||||
if dir.is_dir():
|
||||
all_recipes = []
|
||||
for _, nc_dir in nextcloud_dirs.items():
|
||||
recipe = nc_migration.clean_recipe_dictionary(nc_dir.recipe)
|
||||
all_recipes.append(recipe)
|
||||
|
||||
try:
|
||||
recipe = import_recipes(dir)
|
||||
db.recipes.create(session, recipe.dict())
|
||||
nc_migration.import_recipes_to_database(all_recipes)
|
||||
|
||||
successful_imports.append(recipe.name)
|
||||
except Exception:
|
||||
session.rollback()
|
||||
logging.error(f"Failed Nextcloud Import: {dir.name}")
|
||||
logging.exception("")
|
||||
failed_imports.append(dir.name)
|
||||
for report in nc_migration.migration_report:
|
||||
|
||||
cleanup()
|
||||
minify.migrate_images()
|
||||
if report.status:
|
||||
nc_dir: NextcloudDir = nextcloud_dirs[report.slug]
|
||||
if nc_dir.image:
|
||||
NextcloudMigration.import_image(nc_dir.image, nc_dir.slug)
|
||||
|
||||
return {"successful": successful_imports, "failed": failed_imports}
|
||||
return nc_migration.migration_report
|
|
@ -1,73 +0,0 @@
|
|||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from mealie.schema.migration import MigrationImport
|
||||
from mealie.services.migrations._migration_base import MigrationAlias, MigrationBase
|
||||
from slugify import slugify
|
||||
from sqlalchemy.orm.session import Session
|
||||
|
||||
|
||||
def clean_nextcloud_tags(nextcloud_tags: str):
|
||||
if not isinstance(nextcloud_tags, str):
|
||||
return None
|
||||
|
||||
return [x.title().lstrip() for x in nextcloud_tags.split(",") if x != ""]
|
||||
|
||||
|
||||
@dataclass
|
||||
class NextcloudDir:
|
||||
name: str
|
||||
recipe: dict
|
||||
image: Optional[Path]
|
||||
|
||||
@property
|
||||
def slug(self):
|
||||
return slugify(self.recipe["name"])
|
||||
|
||||
@classmethod
|
||||
def from_dir(cls, dir: Path):
|
||||
try:
|
||||
json_file = next(dir.glob("*.json"))
|
||||
except StopIteration:
|
||||
return None
|
||||
|
||||
try: # TODO: There's got to be a better way to do this.
|
||||
image_file = next(dir.glob("full.*"))
|
||||
except StopIteration:
|
||||
image_file = None
|
||||
|
||||
return cls(name=dir.name, recipe=NextcloudMigration.json_reader(json_file), image=image_file)
|
||||
|
||||
|
||||
class NextcloudMigration(MigrationBase):
|
||||
key_aliases: Optional[list[MigrationAlias]] = [
|
||||
MigrationAlias(key="tags", alias="keywords", func=clean_nextcloud_tags)
|
||||
]
|
||||
|
||||
|
||||
def migrate(session: Session, zip_path: Path) -> list[MigrationImport]:
|
||||
|
||||
nc_migration = NextcloudMigration(migration_file=zip_path, session=session)
|
||||
|
||||
with nc_migration.temp_dir as dir:
|
||||
potential_recipe_dirs = NextcloudMigration.glob_walker(dir, glob_str="**/[!.]*.json", return_parent=True)
|
||||
|
||||
nextcloud_dirs = [NextcloudDir.from_dir(x) for x in potential_recipe_dirs]
|
||||
nextcloud_dirs = {x.slug: x for x in nextcloud_dirs}
|
||||
|
||||
all_recipes = []
|
||||
for key, nc_dir in nextcloud_dirs.items():
|
||||
recipe = nc_migration.clean_recipe_dictionary(nc_dir.recipe)
|
||||
print("Key", key)
|
||||
all_recipes.append(recipe)
|
||||
|
||||
nc_migration.import_recipes_to_database(all_recipes)
|
||||
|
||||
for report in nc_migration.migration_report:
|
||||
|
||||
if report.status:
|
||||
print(report)
|
||||
nc_dir: NextcloudDir = nextcloud_dirs[report.slug]
|
||||
if nc_dir.image:
|
||||
NextcloudMigration.import_image(nc_dir.image, nc_dir.slug)
|
|
@ -128,8 +128,10 @@ class Cleaner:
|
|||
|
||||
@staticmethod
|
||||
def ingredient(ingredients: list) -> str:
|
||||
|
||||
return [Cleaner.html(html.unescape(ing)) for ing in ingredients]
|
||||
if ingredients:
|
||||
return [Cleaner.html(html.unescape(ing)) for ing in ingredients]
|
||||
else:
|
||||
return []
|
||||
|
||||
@staticmethod
|
||||
def yield_amount(yld) -> str:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue