from abc import abstractmethod
import collections
import re
from typing import Any, Dict, List, Optional, Self
import comfyrecipes.settings as settings
import comfyrecipes.issues as issues
import jsonschema
import lxml.html.clean
import mistune
class SafeHTML:
def __init__(self, clean_html: str) -> None:
self.html = clean_html
@classmethod
def from_html(cls, dirty_html: str) -> Self:
cleaner = lxml.html.clean.Cleaner(
allow_tags=["a", "b", "em", "i", "strong"],
safe_attrs_only=True,
safe_attrs=["href"],
)
html = cleaner.clean_html(dirty_html).strip()
assert isinstance(html, str)
assert html.startswith("
")
assert html.endswith("
")
html = html[5:-6]
return cls(html)
@classmethod
def from_markdown(cls, markdowntxt: str) -> Self:
dirty_html = mistune.html(markdowntxt)
assert isinstance(dirty_html, str)
return cls.from_html(dirty_html)
class Context:
def __init__(self) -> None:
self.settings = settings.Settings()
self.units: AUnits = FakeUnits(self)
self.default_unit = Unit(self, "piece", [], [])
self.ingredients: AIngredients = FakeIngredients(self)
self.issues = issues.Issues()
def load_units(
self, unitsdct: List[Dict[str, Any]], unitsschema: Dict[str, Any]
) -> None:
self.units = Units(self)
self.units.units.append(self.default_unit)
jsonschema.validate(instance=unitsdct, schema=unitsschema)
self.units.load(unitsdct)
self.units.validate()
def load_ingredients(
self, ingredientsdct: List[Dict[str, Any]], ingredientsschema: Dict[str, Any]
) -> None:
self.ingredients = Ingredients(self)
jsonschema.validate(instance=ingredientsdct, schema=ingredientsschema)
self.ingredients.load(ingredientsdct)
def load_settings(
self, settingsdct: Dict[str, Any], settingsschema: Dict[str, Any]
) -> None:
jsonschema.validate(instance=settingsdct, schema=settingsschema)
self.settings.load(settingsdct)
class Element:
def __init__(self, ctx: Context) -> None:
self.ctx = ctx
@classmethod
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
return cls(ctx)
class Unit(Element):
def __init__(
self,
ctx: Context,
name: str,
conversions: List["Conversion"],
aliases: List[str],
dct: Optional[Dict[str, Any]] = None,
) -> None:
super().__init__(ctx)
self.name = name
self.conversions = conversions
self.aliases = aliases
self.dct = dct
@classmethod
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
# conversions are an empty list which is populated in finish_load
# which should be ran when all units are registered
aliases: List[str] = []
if "aliases" in dct:
aliases = dct["aliases"]
return cls(ctx, name=dct["name"], conversions=[], aliases=aliases, dct=dct)
def finish_load(self) -> None:
if self.dct is None:
return
if "conversions" not in self.dct:
return
conversions = []
for conv in self.dct["conversions"]:
if "from" in conv:
raise RuntimeError(
"conversions in units.yaml cannot have a from field, it is automatically assigned from the unit name"
)
conv["from"] = self.dct["name"]
conv = Conversion.from_dict(self.ctx, conv)
conversions.append(conv)
self.conversions = conversions
class AUnits:
def __init__(self, ctx: Context) -> None:
self.ctx = ctx
self.units: List[Unit] = []
def load(self, lst: List[Any]) -> None:
pass
@abstractmethod
def get(self, name: str) -> Optional[Unit]:
...
def validate(self) -> None:
pass
class FakeUnits(AUnits):
def get(self, name: str) -> Optional[Unit]:
for unit in self.units:
if unit.name == name:
return unit
unit = Unit(self.ctx, name, [], [])
self.units.append(unit)
return unit
class Units(AUnits):
def load(self, lst: List[Any]) -> None:
for unitdct in lst:
unit = Unit.from_dict(self.ctx, unitdct)
self.units.append(unit)
for unit in self.units:
unit.finish_load()
def get(self, name: str) -> Optional[Unit]:
for unit in self.units:
if unit.name == name or name in unit.aliases:
return unit
return None
def validate(self) -> None:
unitnames = []
for unit in self.units:
unitnames.append(unit.name)
for unitname, num in collections.Counter(unitnames).items():
if num > 1:
self.ctx.issues.error(
issues.ISSUE_DUPLICATE_UNITS,
f"units.yaml: {unitname} should only have one entry, found {num}",
)
class IngredientInstance(Element):
def __init__(
self,
ctx: Context,
name: str,
amount: Optional[float],
unit: Optional[Unit],
alternatives: List["IngredientInstance"],
note: str,
price: Optional["PriceDB"],
) -> None:
super().__init__(ctx)
self.name = name
self.amount = amount
self.unit = unit
self.alternatives = alternatives
self.note = note
self.price = price
@classmethod
def from_dict(cls, ctx: Context, dct: str | Dict[str, Any]) -> Self:
if isinstance(dct, str):
string = dct.strip()
p = re.compile(r"^(?:([0-9\.]+) ([a-zA-Z]+) )+([\w ]+)(?: \((.*)\))?$")
match = p.match(string)
amount = float(1)
unit = ctx.default_unit
name = string
note = None
if match is not None:
amount = float(match.group(1))
unitstr = match.group(2)
if unit is not None:
unitx = ctx.units.get(unitstr)
if unitx is None:
ctx.issues.error(issues.ISSUE_UNKNOWN_UNIT, f"unknown unit {unitstr}")
else:
unit = unitx
name = match.group(3)
note = match.group(4)
if note is None:
note = ""
return cls(
ctx=ctx,
name=name,
amount=amount,
unit=unit,
alternatives=[],
note=note,
price=None,
)
name = dct["name"]
ingredient = ctx.ingredients.get(name)
if ingredient is None:
ctx.issues.error(
issues.ISSUE_UNKNOWN_INGREDIENT, f"unknown ingredient {dct['name']}"
)
amount = 1.0
if "amount" in dct:
amount = dct["amount"]
unit = ctx.default_unit
if "unit" in dct:
unitstr = dct["unit"]
unitx = ctx.units.get(unitstr)
if unitx is None:
ctx.issues.error(issues.ISSUE_UNKNOWN_UNIT, "unknown unit {unitstr}")
else:
unit = unitx
note = ""
if "note" in dct:
note = dct["note"]
alternatives = []
if "or" in dct:
for ingdct in dct["or"]:
ing = IngredientInstance.from_dict(ctx, ingdct)
alternatives.append(ing)
price = None
if ingredient is not None:
price = ingredient.getprice(amount, unit)
return cls(
ctx=ctx,
name=name,
amount=amount,
unit=unit,
alternatives=alternatives,
note=note,
price=price,
)
class StepSection(Element):
def __init__(self, ctx: Context, title: str, steps: List[str]) -> None:
self.title = title
self.steps = steps
@classmethod
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
return cls(ctx, dct["section"], dct["steps"])
class Recipe(Element):
def __init__(
self,
ctx: Context,
title: str,
description: Optional[SafeHTML],
source: Optional[SafeHTML],
ingredients: List[IngredientInstance],
subrecipes: List["Recipe"],
price: Optional["MultiPriceDB"],
stepsections: List[StepSection],
) -> None:
super().__init__(ctx)
self.srcpath = ""
self.outpath = ""
self.title = title
self.description = description
self.ingredients = ingredients
self.subrecipes = subrecipes
self.price = price
self.stepsections = stepsections
self.source = source
@classmethod
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
ingredients: List[IngredientInstance] = []
if "ingredients" in dct:
for ingdct in dct["ingredients"]:
ingredient = IngredientInstance.from_dict(ctx, ingdct)
ingredients.append(ingredient)
subrecipes: List[Recipe] = []
if "subrecipes" in dct:
for partdct in dct["subrecipes"]:
rp = Recipe.from_dict(ctx, partdct)
subrecipes.append(rp)
price: Optional[MultiPriceDB] = None
pricex: float = 0
ingswithprice = 0
ingswithoutprice = 0
currency = None
for ing in ingredients:
if ing.price is None:
ingswithoutprice += 1
continue
ingswithprice += 1
pricex += ing.price.price
cur_currency = ing.price.currency
if currency is None:
currency = cur_currency
elif currency != cur_currency:
# we don't know how to convert currencies yet
currency = None
break
if currency is None or len(ingredients) == 0:
price = None
else:
pricedb = PriceDB(
ctx=ctx,
price=pricex,
amount=1,
unit=ctx.default_unit,
currency=currency,
)
price = MultiPriceDB(
ctx=ctx,
pricedb=pricedb,
item_count=len(ingredients),
item_prices_missing=ingswithoutprice,
)
stepsections: List[StepSection] = []
if "steps" in dct:
defaultstepsection = StepSection(ctx, "default", [])
stepsections = [defaultstepsection]
for step in dct["steps"]:
if isinstance(step, str):
defaultstepsection.steps.append(step)
if isinstance(step, dict):
section = StepSection.from_dict(ctx, step)
stepsections.append(section)
source = None
if "source" in dct:
source = SafeHTML.from_markdown(dct["source"])
description = None
if "description" in dct:
description = SafeHTML.from_markdown(dct["description"])
return cls(
ctx=ctx,
title=dct["title"],
description=description,
source=source,
ingredients=ingredients,
subrecipes=subrecipes,
price=price,
stepsections=stepsections,
)
class Conversion(Element):
def __init__(
self, ctx: Context, fromunit: Unit, tounit: Unit, ratio: float
) -> None:
super().__init__(ctx)
self.fromunit = fromunit
self.tounit = tounit
self.ratio = ratio
@classmethod
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
fromunit = ctx.units.get(dct["from"])
if fromunit is None:
raise RuntimeError(f"unit {dct['from']} doesn't exist")
tounit = ctx.units.get(dct["to"])
if tounit is None:
raise RuntimeError(f"unit {dct['to']} doesn't exist")
return cls(ctx, fromunit, tounit, dct["ratio"])
class Ingredient(Element):
def __init__(
self,
ctx: Context,
name: str,
wdid: Optional[int],
prices: List["PriceDB"],
conversions: List[Conversion],
aliases: List[str],
) -> None:
super().__init__(ctx)
self.name = name
self.wdid = wdid
self.prices = prices
self.conversions = conversions
self.aliases = aliases
@classmethod
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
wdid = None
if "wdid" in dct:
wdid = dct["wdid"]
prices: List[PriceDB] = []
if "prices" in dct:
for pricedct in dct["prices"]:
prices.append(PriceDB.from_dict(ctx, pricedct))
conversions: List[Conversion] = []
if "conversions" in dct:
for convdct in dct["conversions"]:
conversion = Conversion.from_dict(ctx, convdct)
conversions.append(conversion)
aliases: List[str] = []
if "aliases" in dct:
aliases = dct["aliases"]
return cls(
ctx,
name=dct["name"],
wdid=wdid,
prices=prices,
conversions=conversions,
aliases=aliases,
)
def dfs(
self,
conversions: Dict[str, Dict[str, float]],
startname: str,
endname: str,
visited: Optional[List[str]] = None,
) -> Optional[List[str]]:
if visited is None:
visited = []
if startname == endname:
return visited + [startname]
for nextunit in conversions[startname].keys():
if nextunit in visited:
continue
result = self.dfs(conversions, nextunit, endname, visited + [startname])
if result is not None:
return result
return None
def convert(self, amount: float, unitfrom: Unit, unitto: Unit) -> Optional[float]:
conversions: Dict[str, Dict[str, float]] = collections.defaultdict(dict)
# construct node tree
convs = self.conversions
for unit in self.ctx.units.units:
convs += unit.conversions
for conv in convs:
fromname = conv.fromunit.name
toname = conv.tounit.name
conversions[fromname][toname] = conv.ratio
conversions[toname][fromname] = 1 / conv.ratio
# find path between conversions
path = self.dfs(conversions, unitfrom.name, unitto.name)
if path is None:
self.ctx.issues.warn(
issues.ISSUE_KNOWN_PRICE_UNKNOWN_CONVERSION,
f"{self.name} has a known price, but conversion {unitfrom.name} -> {unitto.name} not known",
)
return None
assert len(path) != 0
oldelem = path[0]
for elem in path[1:]:
amount *= conversions[oldelem][elem]
oldelem = elem
return amount
def getprice(self, amount: float, unit: Unit) -> Optional["PriceDB"]:
prices: List[PriceDB] = []
pricedbs: List[PriceDB] = self.prices
for entry in pricedbs:
assert isinstance(entry, PriceDB)
entryamount: float = entry.amount
entryprice: float = entry.price
entryunit: Unit = entry.unit
price = 0.0
if entryunit == unit:
price = (amount / entryamount) * entryprice
else:
pricex = self.convert(amount, unit, entryunit)
if pricex is not None:
price = (pricex / entryamount) * entryprice
newentry = PriceDB(
self.ctx,
price=price,
amount=amount,
unit=unit,
currency=entry.currency,
)
prices.append(newentry)
if len(prices) == 0:
return None
assert len(prices) == 1
return prices[0]
class AIngredients:
def __init__(self, ctx: Context) -> None:
self.ctx = ctx
self.ingredients: List[Ingredient] = []
def load(self, lst: List[Any]) -> None:
pass
@abstractmethod
def get(self, name: str) -> Optional[Ingredient]:
...
class FakeIngredients(AIngredients):
def get(self, name: str) -> Optional[Ingredient]:
for ing in self.ingredients:
if ing.name == name:
return ing
ing = Ingredient(self.ctx, name, None, [], [], [])
self.ingredients.append(ing)
return ing
class Ingredients(AIngredients):
def load(self, lst: List[Any]) -> None:
for ingdct in lst:
ing = Ingredient.from_dict(self.ctx, ingdct)
self.ingredients.append(ing)
def get(self, name: str) -> Optional[Ingredient]:
for ing in self.ingredients:
if ing.name == name or name in ing.aliases:
return ing
return None
class PriceDB(Element):
def __init__(
self,
ctx: Context,
price: float,
amount: float,
unit: Unit,
currency: str,
) -> None:
super().__init__(ctx)
self.price = price
self.amount = amount
self.unit = unit
self.currency = currency
@classmethod
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
price = dct["price"]
amount = dct["amount"]
unitstr = dct["unit"]
unitx = ctx.units.get(unitstr)
# uses the default unit if unit is not known (this won't be
# visible to the user since rendering will stop after
# everything is evaluated)
unit = ctx.default_unit
if unitx is None:
ctx.issues.error(issues.ISSUE_UNKNOWN_UNIT, f"unknown unit {unitstr}")
else:
unit = unitx
currency = ctx.settings.default_currency
if "currency" in dct:
currency = dct["currency"]
if currency is None:
raise RuntimeError("currency not specified and default_currency is also not set")
return cls(ctx=ctx, price=price, amount=amount, unit=unit, currency=currency)
class MultiPriceDB(Element):
def __init__(
self,
ctx: Context,
pricedb: PriceDB,
item_count: int,
item_prices_missing: int,
) -> None:
super().__init__(ctx)
self.pricedb = pricedb
self.item_count = item_count
self.item_prices_missing = item_prices_missing