for example 'oil' will now be allowed, previously it would have to be `1 spoon oil` when sometimes people don't know the amount
616 lines
18 KiB
Python
616 lines
18 KiB
Python
from abc import abstractmethod
|
|
import collections
|
|
import re
|
|
from typing import Any, Dict, List, Optional, Self
|
|
|
|
import comfyrecipes.settings as settings
|
|
import comfyrecipes.issues as issues
|
|
|
|
import jsonschema
|
|
import lxml.html.clean
|
|
import mistune
|
|
|
|
|
|
class SafeHTML:
|
|
def __init__(self, clean_html: str) -> None:
|
|
self.html = clean_html
|
|
|
|
@classmethod
|
|
def from_html(cls, dirty_html: str) -> Self:
|
|
cleaner = lxml.html.clean.Cleaner(
|
|
allow_tags=["a", "b", "em", "i", "strong"],
|
|
safe_attrs_only=True,
|
|
safe_attrs=["href"],
|
|
)
|
|
|
|
html = cleaner.clean_html(dirty_html).strip()
|
|
assert isinstance(html, str)
|
|
|
|
assert html.startswith("<div>")
|
|
assert html.endswith("</div>")
|
|
html = html[5:-6]
|
|
|
|
return cls(html)
|
|
|
|
@classmethod
|
|
def from_markdown(cls, markdowntxt: str) -> Self:
|
|
dirty_html = mistune.html(markdowntxt)
|
|
assert isinstance(dirty_html, str)
|
|
return cls.from_html(dirty_html)
|
|
|
|
|
|
class Context:
|
|
def __init__(self) -> None:
|
|
self.settings = settings.Settings()
|
|
self.units: AUnits = FakeUnits(self)
|
|
self.default_unit = Unit(self, "piece", [], [])
|
|
self.ingredients: AIngredients = FakeIngredients(self)
|
|
self.issues = issues.Issues()
|
|
|
|
def load_units(
|
|
self, unitsdct: List[Dict[str, Any]], unitsschema: Dict[str, Any]
|
|
) -> None:
|
|
self.units = Units(self)
|
|
self.units.units.append(self.default_unit)
|
|
jsonschema.validate(instance=unitsdct, schema=unitsschema)
|
|
self.units.load(unitsdct)
|
|
self.units.validate()
|
|
|
|
def load_ingredients(
|
|
self, ingredientsdct: List[Dict[str, Any]], ingredientsschema: Dict[str, Any]
|
|
) -> None:
|
|
self.ingredients = Ingredients(self)
|
|
jsonschema.validate(instance=ingredientsdct, schema=ingredientsschema)
|
|
self.ingredients.load(ingredientsdct)
|
|
|
|
def load_settings(
|
|
self, settingsdct: Dict[str, Any], settingsschema: Dict[str, Any]
|
|
) -> None:
|
|
jsonschema.validate(instance=settingsdct, schema=settingsschema)
|
|
self.settings.load(settingsdct)
|
|
|
|
|
|
class Element:
|
|
def __init__(self, ctx: Context) -> None:
|
|
self.ctx = ctx
|
|
|
|
@classmethod
|
|
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
|
|
return cls(ctx)
|
|
|
|
|
|
class Unit(Element):
|
|
def __init__(
|
|
self,
|
|
ctx: Context,
|
|
name: str,
|
|
conversions: List["Conversion"],
|
|
aliases: List[str],
|
|
dct: Optional[Dict[str, Any]] = None,
|
|
) -> None:
|
|
super().__init__(ctx)
|
|
self.name = name
|
|
self.conversions = conversions
|
|
self.aliases = aliases
|
|
self.dct = dct
|
|
|
|
@classmethod
|
|
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
|
|
# conversions are an empty list which is populated in finish_load
|
|
# which should be ran when all units are registered
|
|
aliases: List[str] = []
|
|
if "aliases" in dct:
|
|
aliases = dct["aliases"]
|
|
|
|
return cls(ctx, name=dct["name"], conversions=[], aliases=aliases, dct=dct)
|
|
|
|
def finish_load(self) -> None:
|
|
if self.dct is None:
|
|
return
|
|
if "conversions" not in self.dct:
|
|
return
|
|
conversions = []
|
|
for conv in self.dct["conversions"]:
|
|
if "from" in conv:
|
|
raise RuntimeError(
|
|
"conversions in units.yaml cannot have a from field, it is automatically assigned from the unit name"
|
|
)
|
|
conv["from"] = self.dct["name"]
|
|
conv = Conversion.from_dict(self.ctx, conv)
|
|
conversions.append(conv)
|
|
|
|
self.conversions = conversions
|
|
|
|
|
|
class AUnits:
|
|
def __init__(self, ctx: Context) -> None:
|
|
self.ctx = ctx
|
|
self.units: List[Unit] = []
|
|
|
|
def load(self, lst: List[Any]) -> None:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get(self, name: str) -> Optional[Unit]:
|
|
...
|
|
|
|
def validate(self) -> None:
|
|
pass
|
|
|
|
|
|
class FakeUnits(AUnits):
|
|
def get(self, name: str) -> Optional[Unit]:
|
|
for unit in self.units:
|
|
if unit.name == name:
|
|
return unit
|
|
unit = Unit(self.ctx, name, [], [])
|
|
self.units.append(unit)
|
|
return unit
|
|
|
|
|
|
class Units(AUnits):
|
|
def load(self, lst: List[Any]) -> None:
|
|
for unitdct in lst:
|
|
unit = Unit.from_dict(self.ctx, unitdct)
|
|
self.units.append(unit)
|
|
for unit in self.units:
|
|
unit.finish_load()
|
|
|
|
def get(self, name: str) -> Optional[Unit]:
|
|
for unit in self.units:
|
|
if unit.name == name or name in unit.aliases:
|
|
return unit
|
|
return None
|
|
|
|
def validate(self) -> None:
|
|
unitnames = []
|
|
for unit in self.units:
|
|
unitnames.append(unit.name)
|
|
for unitname, num in collections.Counter(unitnames).items():
|
|
if num > 1:
|
|
self.ctx.issues.error(
|
|
issues.ISSUE_DUPLICATE_UNITS,
|
|
f"units.yaml: {unitname} should only have one entry, found {num}",
|
|
)
|
|
|
|
|
|
class IngredientInstance(Element):
|
|
def __init__(
|
|
self,
|
|
ctx: Context,
|
|
name: str,
|
|
amount: Optional[float],
|
|
unit: Optional[Unit],
|
|
alternatives: List["IngredientInstance"],
|
|
note: str,
|
|
price: Optional["PriceDB"],
|
|
) -> None:
|
|
super().__init__(ctx)
|
|
self.name = name
|
|
self.amount = amount
|
|
self.unit = unit
|
|
self.alternatives = alternatives
|
|
self.note = note
|
|
self.price = price
|
|
|
|
@classmethod
|
|
def from_dict(cls, ctx: Context, dct: str | Dict[str, Any]) -> Self:
|
|
if isinstance(dct, str):
|
|
string = dct.strip()
|
|
p = re.compile(r"^(?:([0-9\.]+) ([a-zA-Z]+) )+([\w ]+)(?: \((.*)\))?$")
|
|
match = p.match(string)
|
|
|
|
amount = float(1)
|
|
unit = ctx.default_unit
|
|
name = string
|
|
note = None
|
|
if match is not None:
|
|
amount = float(match.group(1))
|
|
unitstr = match.group(2)
|
|
if unit is not None:
|
|
unitx = ctx.units.get(unitstr)
|
|
if unitx is None:
|
|
ctx.issues.error(issues.ISSUE_UNKNOWN_UNIT, f"unknown unit {unitstr}")
|
|
else:
|
|
unit = unitx
|
|
name = match.group(3)
|
|
note = match.group(4)
|
|
if note is None:
|
|
note = ""
|
|
return cls(
|
|
ctx=ctx,
|
|
name=name,
|
|
amount=amount,
|
|
unit=unit,
|
|
alternatives=[],
|
|
note=note,
|
|
price=None,
|
|
)
|
|
name = dct["name"]
|
|
|
|
ingredient = ctx.ingredients.get(name)
|
|
if ingredient is None:
|
|
ctx.issues.error(
|
|
issues.ISSUE_UNKNOWN_INGREDIENT, f"unknown ingredient {dct['name']}"
|
|
)
|
|
|
|
amount = 1.0
|
|
if "amount" in dct:
|
|
amount = dct["amount"]
|
|
|
|
unit = ctx.default_unit
|
|
if "unit" in dct:
|
|
unitstr = dct["unit"]
|
|
unitx = ctx.units.get(unitstr)
|
|
if unitx is None:
|
|
ctx.issues.error(issues.ISSUE_UNKNOWN_UNIT, "unknown unit {unitstr}")
|
|
else:
|
|
unit = unitx
|
|
|
|
note = ""
|
|
if "note" in dct:
|
|
note = dct["note"]
|
|
|
|
alternatives = []
|
|
if "or" in dct:
|
|
for ingdct in dct["or"]:
|
|
ing = IngredientInstance.from_dict(ctx, ingdct)
|
|
alternatives.append(ing)
|
|
|
|
price = None
|
|
if ingredient is not None:
|
|
price = ingredient.getprice(amount, unit)
|
|
return cls(
|
|
ctx=ctx,
|
|
name=name,
|
|
amount=amount,
|
|
unit=unit,
|
|
alternatives=alternatives,
|
|
note=note,
|
|
price=price,
|
|
)
|
|
|
|
|
|
class StepSection(Element):
|
|
def __init__(self, ctx: Context, title: str, steps: List[str]) -> None:
|
|
self.title = title
|
|
self.steps = steps
|
|
|
|
@classmethod
|
|
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
|
|
return cls(ctx, dct["section"], dct["steps"])
|
|
|
|
|
|
class Recipe(Element):
|
|
def __init__(
|
|
self,
|
|
ctx: Context,
|
|
title: str,
|
|
description: Optional[SafeHTML],
|
|
source: Optional[SafeHTML],
|
|
ingredients: List[IngredientInstance],
|
|
subrecipes: List["Recipe"],
|
|
price: Optional["MultiPriceDB"],
|
|
stepsections: List[StepSection],
|
|
) -> None:
|
|
super().__init__(ctx)
|
|
self.srcpath = ""
|
|
self.outpath = ""
|
|
self.title = title
|
|
self.description = description
|
|
self.ingredients = ingredients
|
|
self.subrecipes = subrecipes
|
|
self.price = price
|
|
self.stepsections = stepsections
|
|
self.source = source
|
|
|
|
@classmethod
|
|
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
|
|
ingredients: List[IngredientInstance] = []
|
|
if "ingredients" in dct:
|
|
for ingdct in dct["ingredients"]:
|
|
ingredient = IngredientInstance.from_dict(ctx, ingdct)
|
|
ingredients.append(ingredient)
|
|
|
|
subrecipes: List[Recipe] = []
|
|
if "subrecipes" in dct:
|
|
for partdct in dct["subrecipes"]:
|
|
rp = Recipe.from_dict(ctx, partdct)
|
|
subrecipes.append(rp)
|
|
|
|
price: Optional[MultiPriceDB] = None
|
|
pricex: float = 0
|
|
ingswithprice = 0
|
|
ingswithoutprice = 0
|
|
currency = None
|
|
for ing in ingredients:
|
|
if ing.price is None:
|
|
ingswithoutprice += 1
|
|
continue
|
|
ingswithprice += 1
|
|
pricex += ing.price.price
|
|
cur_currency = ing.price.currency
|
|
if currency is None:
|
|
currency = cur_currency
|
|
elif currency != cur_currency:
|
|
# we don't know how to convert currencies yet
|
|
currency = None
|
|
break
|
|
if currency is None or len(ingredients) == 0:
|
|
price = None
|
|
else:
|
|
pricedb = PriceDB(
|
|
ctx=ctx,
|
|
price=pricex,
|
|
amount=1,
|
|
unit=ctx.default_unit,
|
|
currency=currency,
|
|
)
|
|
price = MultiPriceDB(
|
|
ctx=ctx,
|
|
pricedb=pricedb,
|
|
item_count=len(ingredients),
|
|
item_prices_missing=ingswithoutprice,
|
|
)
|
|
|
|
stepsections: List[StepSection] = []
|
|
if "steps" in dct:
|
|
defaultstepsection = StepSection(ctx, "default", [])
|
|
stepsections = [defaultstepsection]
|
|
for step in dct["steps"]:
|
|
if isinstance(step, str):
|
|
defaultstepsection.steps.append(step)
|
|
if isinstance(step, dict):
|
|
section = StepSection.from_dict(ctx, step)
|
|
stepsections.append(section)
|
|
|
|
source = None
|
|
if "source" in dct:
|
|
source = SafeHTML.from_markdown(dct["source"])
|
|
|
|
description = None
|
|
if "description" in dct:
|
|
description = SafeHTML.from_markdown(dct["description"])
|
|
|
|
return cls(
|
|
ctx=ctx,
|
|
title=dct["title"],
|
|
description=description,
|
|
source=source,
|
|
ingredients=ingredients,
|
|
subrecipes=subrecipes,
|
|
price=price,
|
|
stepsections=stepsections,
|
|
)
|
|
|
|
|
|
class Conversion(Element):
|
|
def __init__(
|
|
self, ctx: Context, fromunit: Unit, tounit: Unit, ratio: float
|
|
) -> None:
|
|
super().__init__(ctx)
|
|
self.fromunit = fromunit
|
|
self.tounit = tounit
|
|
self.ratio = ratio
|
|
|
|
@classmethod
|
|
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
|
|
fromunit = ctx.units.get(dct["from"])
|
|
if fromunit is None:
|
|
raise RuntimeError(f"unit {dct['from']} doesn't exist")
|
|
|
|
tounit = ctx.units.get(dct["to"])
|
|
if tounit is None:
|
|
raise RuntimeError(f"unit {dct['to']} doesn't exist")
|
|
return cls(ctx, fromunit, tounit, dct["ratio"])
|
|
|
|
|
|
class Ingredient(Element):
|
|
def __init__(
|
|
self,
|
|
ctx: Context,
|
|
name: str,
|
|
wdid: Optional[int],
|
|
prices: List["PriceDB"],
|
|
conversions: List[Conversion],
|
|
aliases: List[str],
|
|
) -> None:
|
|
super().__init__(ctx)
|
|
self.name = name
|
|
self.wdid = wdid
|
|
self.prices = prices
|
|
self.conversions = conversions
|
|
self.aliases = aliases
|
|
|
|
@classmethod
|
|
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
|
|
wdid = None
|
|
if "wdid" in dct:
|
|
wdid = dct["wdid"]
|
|
|
|
prices: List[PriceDB] = []
|
|
if "prices" in dct:
|
|
for pricedct in dct["prices"]:
|
|
prices.append(PriceDB.from_dict(ctx, pricedct))
|
|
|
|
conversions: List[Conversion] = []
|
|
if "conversions" in dct:
|
|
for convdct in dct["conversions"]:
|
|
conversion = Conversion.from_dict(ctx, convdct)
|
|
conversions.append(conversion)
|
|
|
|
aliases: List[str] = []
|
|
if "aliases" in dct:
|
|
aliases = dct["aliases"]
|
|
|
|
return cls(
|
|
ctx,
|
|
name=dct["name"],
|
|
wdid=wdid,
|
|
prices=prices,
|
|
conversions=conversions,
|
|
aliases=aliases,
|
|
)
|
|
|
|
def dfs(
|
|
self,
|
|
conversions: Dict[str, Dict[str, float]],
|
|
startname: str,
|
|
endname: str,
|
|
visited: Optional[List[str]] = None,
|
|
) -> Optional[List[str]]:
|
|
if visited is None:
|
|
visited = []
|
|
if startname == endname:
|
|
return visited + [startname]
|
|
|
|
for nextunit in conversions[startname].keys():
|
|
if nextunit in visited:
|
|
continue
|
|
result = self.dfs(conversions, nextunit, endname, visited + [startname])
|
|
if result is not None:
|
|
return result
|
|
return None
|
|
|
|
def convert(self, amount: float, unitfrom: Unit, unitto: Unit) -> Optional[float]:
|
|
conversions: Dict[str, Dict[str, float]] = collections.defaultdict(dict)
|
|
# construct node tree
|
|
convs = self.conversions
|
|
for unit in self.ctx.units.units:
|
|
convs += unit.conversions
|
|
for conv in convs:
|
|
fromname = conv.fromunit.name
|
|
toname = conv.tounit.name
|
|
conversions[fromname][toname] = conv.ratio
|
|
conversions[toname][fromname] = 1 / conv.ratio
|
|
|
|
# find path between conversions
|
|
path = self.dfs(conversions, unitfrom.name, unitto.name)
|
|
if path is None:
|
|
self.ctx.issues.warn(
|
|
issues.ISSUE_KNOWN_PRICE_UNKNOWN_CONVERSION,
|
|
f"{self.name} has a known price, but conversion {unitfrom.name} -> {unitto.name} not known",
|
|
)
|
|
return None
|
|
assert len(path) != 0
|
|
oldelem = path[0]
|
|
for elem in path[1:]:
|
|
amount *= conversions[oldelem][elem]
|
|
oldelem = elem
|
|
return amount
|
|
|
|
def getprice(self, amount: float, unit: Unit) -> Optional["PriceDB"]:
|
|
prices: List[PriceDB] = []
|
|
pricedbs: List[PriceDB] = self.prices
|
|
for entry in pricedbs:
|
|
assert isinstance(entry, PriceDB)
|
|
entryamount: float = entry.amount
|
|
entryprice: float = entry.price
|
|
entryunit: Unit = entry.unit
|
|
price = 0.0
|
|
if entryunit == unit:
|
|
price = (amount / entryamount) * entryprice
|
|
else:
|
|
pricex = self.convert(amount, unit, entryunit)
|
|
if pricex is not None:
|
|
price = (pricex / entryamount) * entryprice
|
|
newentry = PriceDB(
|
|
self.ctx,
|
|
price=price,
|
|
amount=amount,
|
|
unit=unit,
|
|
currency=entry.currency,
|
|
)
|
|
prices.append(newentry)
|
|
if len(prices) == 0:
|
|
return None
|
|
assert len(prices) == 1
|
|
return prices[0]
|
|
|
|
|
|
class AIngredients:
|
|
def __init__(self, ctx: Context) -> None:
|
|
self.ctx = ctx
|
|
self.ingredients: List[Ingredient] = []
|
|
|
|
def load(self, lst: List[Any]) -> None:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get(self, name: str) -> Optional[Ingredient]:
|
|
...
|
|
|
|
|
|
class FakeIngredients(AIngredients):
|
|
def get(self, name: str) -> Optional[Ingredient]:
|
|
for ing in self.ingredients:
|
|
if ing.name == name:
|
|
return ing
|
|
ing = Ingredient(self.ctx, name, None, [], [], [])
|
|
self.ingredients.append(ing)
|
|
return ing
|
|
|
|
|
|
class Ingredients(AIngredients):
|
|
def load(self, lst: List[Any]) -> None:
|
|
for ingdct in lst:
|
|
ing = Ingredient.from_dict(self.ctx, ingdct)
|
|
self.ingredients.append(ing)
|
|
|
|
def get(self, name: str) -> Optional[Ingredient]:
|
|
for ing in self.ingredients:
|
|
if ing.name == name or name in ing.aliases:
|
|
return ing
|
|
return None
|
|
|
|
|
|
class PriceDB(Element):
|
|
def __init__(
|
|
self,
|
|
ctx: Context,
|
|
price: float,
|
|
amount: float,
|
|
unit: Unit,
|
|
currency: str,
|
|
) -> None:
|
|
super().__init__(ctx)
|
|
self.price = price
|
|
self.amount = amount
|
|
self.unit = unit
|
|
self.currency = currency
|
|
|
|
@classmethod
|
|
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
|
|
price = dct["price"]
|
|
amount = dct["amount"]
|
|
|
|
unitstr = dct["unit"]
|
|
unitx = ctx.units.get(unitstr)
|
|
# uses the default unit if unit is not known (this won't be
|
|
# visible to the user since rendering will stop after
|
|
# everything is evaluated)
|
|
unit = ctx.default_unit
|
|
if unitx is None:
|
|
ctx.issues.error(issues.ISSUE_UNKNOWN_UNIT, f"unknown unit {unitstr}")
|
|
else:
|
|
unit = unitx
|
|
|
|
currency = ctx.settings.default_currency
|
|
if "currency" in dct:
|
|
currency = dct["currency"]
|
|
if currency is None:
|
|
raise RuntimeError("currency not specified and default_currency is also not set")
|
|
return cls(ctx=ctx, price=price, amount=amount, unit=unit, currency=currency)
|
|
|
|
class MultiPriceDB(Element):
|
|
def __init__(
|
|
self,
|
|
ctx: Context,
|
|
pricedb: PriceDB,
|
|
item_count: int,
|
|
item_prices_missing: int,
|
|
) -> None:
|
|
super().__init__(ctx)
|
|
self.pricedb = pricedb
|
|
self.item_count = item_count
|
|
self.item_prices_missing = item_prices_missing
|