comfy-recipes/recipes.py

401 lines
13 KiB
Python

from abc import abstractmethod
import sys
from typing import Dict, List, Any, Optional, Set
import os
import json
import yaml
import jinja2
import jsonschema
import jsonschema.exceptions
class Context:
def __init__(self) -> None:
self.units = Units(self)
self.default_unit = Unit(self, {"name": "piece"})
self.units.units.append(self.default_unit)
self.ingredients = Ingredients(self)
self.issues: List[str] = []
class Element:
def __init__(self, ctx: Context, dct: Dict[str, Any]) -> None:
self.ctx = ctx
self.dct = dct
self.load(dct)
for elem in self.dct.values():
if isinstance(elem, dict):
raise RuntimeError("Something wasn't processed properly")
def __contains__(self, item: Any) -> bool:
return item in self.dct
def __getitem__(self, key: str) -> Any:
return self.dct[key]
def __setitem__(self, key: str, val: Any) -> None:
self.dct[key] = val
def __repr__(self) -> str:
return self.dct.__repr__()
@abstractmethod
def load(self, dct: Dict[str, Any]) -> None:
...
class Conversion(Element):
def load(self, dct: Dict[str, Any]) -> None:
fromunit = self.ctx.units.get(dct["from"])
if fromunit is None:
raise RuntimeError(f"unit {dct['from']} doesn't exist")
self["from"] = fromunit
tounit = self.ctx.units.get(dct["to"])
if tounit is None:
raise RuntimeError(f"unit {dct['to']} doesn't exist")
self["to"] = tounit
class Unit(Element):
def load(self, dct: Dict[str, Any]) -> None:
oldunits = self.ctx.units.units[:]
self.ctx.units.units.append(self)
aliases: List[str] = []
if "aliases" in dct:
for alias in dct["aliases"]:
aliases.append(alias)
self["aliases"] = aliases
self.ctx.units.units = oldunits
def finish_load(self) -> None:
conversions: List[Conversion] = []
if "conversions" in self.dct:
for convdct in self.dct["conversions"]:
if "from" in self.dct["conversions"]:
raise RuntimeError(
"conversions in units.yaml cannot have a from field, it is automatically assigned from the unit name"
)
convdct["from"] = self["name"]
conversion = Conversion(self.ctx, convdct)
conversions.append(conversion)
self["conversions"] = conversions
class Units:
def __init__(self, ctx: Context) -> None:
self.ctx = ctx
self.units: List[Unit] = []
def load(self, lst: List[Any]) -> None:
for unitdct in lst:
unit = Unit(self.ctx, unitdct)
self.units.append(unit)
for unit in self.units:
unit.finish_load()
def get(self, name: str) -> Optional[Unit]:
for unit in self.units:
if unit["name"] == name or "aliases" in unit and name in unit["aliases"]:
return unit
return None
class Ingredient(Element):
def load(self, dct: Dict[str, Any]) -> None:
if "prices" in dct:
pricedb = PriceDBs(self.ctx)
self.ctx.issues += pricedb.load(dct["prices"])
self["prices"] = pricedb
conversions = []
if "conversions" in dct:
for convdct in dct["conversions"]:
conversion = Conversion(self.ctx, convdct)
conversions.append(conversion)
self["conversions"] = conversions
def convert(
self, amount: float, unitfrom: Unit, unitto: Unit, scannedunits: List[Unit] = []
) -> Optional[float]:
# TODO: this can not solve conversions when we have kg -> g and g -> piece
if unitto == unitfrom:
return amount
conversions = (
self["conversions"] + unitfrom["conversions"] + unitto["conversions"]
)
for conv in conversions:
ratio: float = conv["ratio"]
if (
conv["from"]["name"] == unitfrom["name"]
and conv["to"]["name"] == unitto["name"]
):
return amount * ratio
if (
conv["to"]["name"] == unitfrom["name"]
and conv["from"]["name"] == unitto["name"]
):
return amount / ratio
return None
def getprice(self, amount: float, unit: Unit) -> Optional[float]:
if "prices" not in self.dct:
return None
prices: List[float] = []
pricedbs: PriceDBs = self["prices"]
for entry in pricedbs.pricedbs:
assert isinstance(entry, PriceDB)
entryamount: float = entry["amount"]
entryprice: float = entry["price"]
entryunit: Unit = entry["unit"]
if entryunit == unit:
prices.append((amount / entryamount) * entryprice)
else:
newamount = self.convert(amount, unit, entryunit)
if newamount is not None:
prices.append((newamount / entryamount) * entryprice)
if len(prices) == 0:
return None
assert len(prices) == 1
return prices[0]
class Ingredients:
def __init__(self, ctx: Context) -> None:
self.ctx = ctx
self.ingredients: List[Ingredient] = []
def load(self, lst: List[Any]) -> None:
for ingdct in lst:
ing = Ingredient(self.ctx, ingdct)
self.ingredients.append(ing)
def get(self, name: str) -> Optional[Ingredient]:
for ing in self.ingredients:
if ing["name"] == name or "aliases" in ing and name in ing["aliases"]:
return ing
return None
class PriceDBs:
def __init__(self, ctx: Context) -> None:
self.ctx = ctx
self.pricedbs: List[PriceDB] = []
def load(self, lst: List[Any]) -> List[str]:
for elem in lst:
pricedb = PriceDB(self.ctx, elem)
self.pricedbs.append(pricedb)
return self.ctx.issues
class PriceDB(Element):
def load(self, dct: Dict[str, Any]) -> None:
if "amount" not in dct:
self["amount"] = 1.0
if "unit" in dct:
unitstr = dct["unit"]
self["unit"] = self.ctx.units.get(unitstr)
if self["unit"] is None:
self.ctx.issues.append(f"unknown unit {unitstr}")
else:
self["unit"] = self.ctx.default_unit
class IngredientInstance(Element):
def load(self, dct: Dict[str, Any]) -> None:
ingredient = self.ctx.ingredients.get(dct["name"])
if ingredient is None:
self.ctx.issues.append(f"unknown ingredient {dct['name']}")
self["ingredient"] = ingredient
if "amount" not in dct:
self["amount"] = 1.0
if "unit" in dct:
unitstr = dct["unit"]
self["unit"] = self.ctx.units.get(unitstr)
if self["unit"] is None:
self.ctx.issues.append("unknown unit {unitstr}")
else:
self["unit"] = self.ctx.default_unit
if "note" not in dct:
self["note"] = ""
alternatives = []
if "or" in dct:
for ingdct in dct["or"]:
ing = IngredientInstance(self.ctx, ingdct)
alternatives.append(ing)
self["alternatives"] = alternatives
if ingredient is not None:
self["price"] = ingredient.getprice(self["amount"], self["unit"])
class Recipe(Element):
def __init__(self, ctx: Context, dct: Dict[str, Any]) -> None:
super().__init__(ctx, dct)
self.srcpath = ""
self.outpath = ""
def load(self, dct: Dict[str, Any]) -> None:
ingredients: List[IngredientInstance] = []
if "ingredients" in dct:
for ing in dct["ingredients"]:
ingredient = IngredientInstance(self.ctx, ing)
ingredients.append(ingredient)
self["ingredients"] = ingredients
subrecipes: List[Recipe] = []
if "subrecipes" in dct:
for partdct in dct["subrecipes"]:
rp = Recipe(self.ctx, partdct)
subrecipes.append(rp)
self["subrecipes"] = subrecipes
class Builder:
def __init__(self) -> None:
self.jinjaenv = jinja2.Environment(
loader=jinja2.FileSystemLoader("templates"),
autoescape=jinja2.select_autoescape(),
)
def numprint(input: int) -> str:
out = str(input)
if out.endswith(".0"):
return out.split(".", maxsplit=1)[0]
return out
def amountprint(input: int) -> str:
out = numprint(input)
if out == "0.5":
return "1/2"
if out == "0.25":
return "1/4"
if out == "0.75":
return "3/4"
return out
self.jinjaenv.filters["numprint"] = numprint
self.jinjaenv.filters["amountprint"] = amountprint
self.ctx = Context()
# list of output files that will be built
self.outfiles: Set[str] = set()
def load_file(self, file: str) -> Any:
print(f"loading {file}")
with open(file, encoding="utf-8") as f:
txt = f.read()
if file.endswith(".json"):
return json.loads(txt)
return yaml.safe_load(txt)
def rendertemplate(
self, templatepath: str, format: str, file: str, dir: str, args: Any
) -> None:
template = self.jinjaenv.get_template(templatepath)
print(f"rendering {file}")
outstr = template.render(args)
os.makedirs(f"{dir}/out/{format}", exist_ok=True)
with open(f"{dir}/out/{format}/{file}", "w", encoding="utf-8") as f:
f.write(outstr)
self.outfiles.add(file)
def load(self, dir: str) -> int:
unitsdct = self.load_file(dir + "/units.yaml")
unitsschema = self.load_file("schemas/units.json")
jsonschema.validate(instance=unitsdct, schema=unitsschema)
self.ctx.units.load(unitsdct)
if len(self.ctx.issues) != 0:
for issue in self.ctx.issues:
print("ERROR in units.yaml:", issue)
return 1
ingredientsdct = self.load_file(dir + "/ingredients.yaml")
ingredientsschema = self.load_file("schemas/ingredients.json")
jsonschema.validate(instance=ingredientsdct, schema=ingredientsschema)
self.ctx.ingredients.load(ingredientsdct)
if len(self.ctx.issues) != 0:
for issue in self.ctx.issues:
print("ERROR in ingredients.yaml:", issue)
return 1
return 0
def run(self, dir: str) -> int:
files = []
for _, _, filesx in os.walk(dir + "/recipes"):
files = filesx
files.sort()
recipes: List[Recipe] = []
recipeschema = self.load_file("schemas/recipe.json")
for file in files:
if not file.endswith(".yaml"):
print(f"unknown extension of {file}")
continue
recipedct = self.load_file(dir + "/recipes/" + file)
jsonschema.validate(instance=recipedct, schema=recipeschema)
recipe = Recipe(self.ctx, recipedct)
recipe.srcpath = file
recipe.outpath = file[:-5] + ".html"
recipes.append(recipe)
if len(self.ctx.issues) != 0:
for issue in self.ctx.issues:
print("ERROR", issue)
return 1
self.rendertemplate(
templatepath="index.html",
format="html",
file="index.html",
dir=dir,
args={"recipes": recipes},
)
for recipe in recipes:
self.rendertemplate(
templatepath="recipe.html",
format="html",
file=recipe.outpath,
dir=dir,
args={"recipe": recipe},
)
return 0
def finish(self, dir: str) -> int:
files = set()
for _, _, filesx in os.walk(f"{dir}/out/html"):
files = set(filesx)
# files we did not generate, probably left by a previous run, but not valid anymore
extra_files = files - self.outfiles
for file in extra_files:
print(f"removing obsolete {file}")
os.remove(f"{dir}/out/html/{file}")
return 0
def build(self, path: str) -> int:
fcs = [self.load, self.run, self.finish]
for func in fcs:
try:
ret = func(path)
if ret != 0:
return ret
except jsonschema.exceptions.ValidationError as e:
print("ERROR:", e)
return 1
return 0
if __name__ == "__main__":
builder = Builder()
sys.exit(builder.build("recipes"))