from abc import abstractmethod
import collections
from typing import Any, Dict, List, Optional, Self
import comfyrecipes.settings as settings
import comfyrecipes.issues as issues
import jsonschema
import lxml.html.clean
import mistune
class SafeHTML:
def __init__(self, clean_html: str) -> None:
self.html = clean_html
@classmethod
def from_html(cls, dirty_html: str) -> Self:
cleaner = lxml.html.clean.Cleaner(
allow_tags=["a", "b", "em", "i", "strong"],
safe_attrs_only=True,
safe_attrs=["href"],
)
html = cleaner.clean_html(dirty_html).strip()
assert isinstance(html, str)
assert html.startswith("
")
assert html.endswith("
")
html = html[5:-6]
return cls(html)
@classmethod
def from_markdown(cls, markdowntxt: str) -> Self:
dirty_html = mistune.html(markdowntxt)
assert isinstance(dirty_html, str)
return cls.from_html(dirty_html)
class Context:
def __init__(self) -> None:
self.settings = settings.Settings()
self.units: AUnits = FakeUnits(self)
self.default_unit = Unit(self, "piece", [], [])
self.ingredients: AIngredients = FakeIngredients(self)
self.issues = issues.Issues()
def load_units(
self, unitsdct: List[Dict[str, Any]], unitsschema: Dict[str, Any]
) -> None:
self.units = Units(self)
self.units.units.append(self.default_unit)
jsonschema.validate(instance=unitsdct, schema=unitsschema)
self.units.load(unitsdct)
self.units.validate()
def load_ingredients(
self, ingredientsdct: List[Dict[str, Any]], ingredientsschema: Dict[str, Any]
) -> None:
self.ingredients = Ingredients(self)
jsonschema.validate(instance=ingredientsdct, schema=ingredientsschema)
self.ingredients.load(ingredientsdct)
def load_settings(
self, settingsdct: Dict[str, Any], settingsschema: Dict[str, Any]
) -> None:
jsonschema.validate(instance=settingsdct, schema=settingsschema)
self.settings.load(settingsdct)
class Element:
def __init__(self, ctx: Context) -> None:
self.ctx = ctx
@classmethod
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
return cls(ctx)
class Unit(Element):
def __init__(
self,
ctx: Context,
name: str,
conversions: List["Conversion"],
aliases: List[str],
dct: Optional[Dict[str, Any]] = None,
) -> None:
super().__init__(ctx)
self.name = name
self.conversions = conversions
self.aliases = aliases
self.dct = dct
@classmethod
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
# conversions are an empty list which is populated in finish_load
# which should be ran when all units are registered
aliases: List[str] = []
if "aliases" in dct:
aliases = dct["aliases"]
return cls(ctx, name=dct["name"], conversions=[], aliases=aliases, dct=dct)
def finish_load(self) -> None:
if self.dct is None:
return
if "conversions" not in self.dct:
return
conversions = []
for conv in self.dct["conversions"]:
if "from" in conv:
raise RuntimeError(
"conversions in units.yaml cannot have a from field, it is automatically assigned from the unit name"
)
conv["from"] = self.dct["name"]
conv = Conversion.from_dict(self.ctx, conv)
conversions.append(conv)
self.conversions = conversions
class AUnits:
def __init__(self, ctx: Context) -> None:
self.ctx = ctx
self.units: List[Unit] = []
def load(self, lst: List[Any]) -> None:
pass
@abstractmethod
def get(self, name: str) -> Optional[Unit]:
...
def validate(self) -> None:
pass
class FakeUnits(AUnits):
def get(self, name: str) -> Optional[Unit]:
for unit in self.units:
if unit.name == name:
return unit
unit = Unit(self.ctx, name, [], [])
self.units.append(unit)
return unit
class Units(AUnits):
def load(self, lst: List[Any]) -> None:
for unitdct in lst:
unit = Unit.from_dict(self.ctx, unitdct)
self.units.append(unit)
for unit in self.units:
unit.finish_load()
def get(self, name: str) -> Optional[Unit]:
for unit in self.units:
if unit.name == name or name in unit.aliases:
return unit
return None
def validate(self) -> None:
unitnames = []
for unit in self.units:
unitnames.append(unit.name)
for unitname, num in collections.Counter(unitnames).items():
if num > 1:
self.ctx.issues.error(
issues.ISSUE_DUPLICATE_UNITS,
f"units.yaml: {unitname} should only have one entry, found {num}",
)
class IngredientInstance(Element):
def __init__(
self,
ctx: Context,
name: str,
amount: float,
unit: Unit,
alternatives: List["IngredientInstance"],
note: str,
price: Optional["PriceDB"],
) -> None:
super().__init__(ctx)
self.name = name
self.amount = amount
self.unit = unit
self.alternatives = alternatives
self.note = note
self.price = price
@classmethod
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
name = dct["name"]
ingredient = ctx.ingredients.get(name)
if ingredient is None:
ctx.issues.error(
issues.ISSUE_UNKNOWN_INGREDIENT, f"unknown ingredient {dct['name']}"
)
amount = 1.0
if "amount" in dct:
amount = dct["amount"]
unit = ctx.default_unit
if "unit" in dct:
unitstr = dct["unit"]
unitx = ctx.units.get(unitstr)
if unitx is None:
ctx.issues.error(issues.ISSUE_UNKNOWN_UNIT, "unknown unit {unitstr}")
else:
unit = unitx
note = ""
if "note" in dct:
note = dct["note"]
alternatives = []
if "or" in dct:
for ingdct in dct["or"]:
ing = IngredientInstance.from_dict(ctx, ingdct)
alternatives.append(ing)
price = None
if ingredient is not None:
price = ingredient.getprice(amount, unit)
return cls(
ctx=ctx,
name=name,
amount=amount,
unit=unit,
alternatives=alternatives,
note=note,
price=price,
)
class StepSection(Element):
def __init__(self, ctx: Context, title: str, steps: List[str]) -> None:
self.title = title
self.steps = steps
@classmethod
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
return cls(ctx, dct["section"], dct["steps"])
class Recipe(Element):
def __init__(
self,
ctx: Context,
title: str,
description: Optional[SafeHTML],
source: Optional[SafeHTML],
ingredients: List[IngredientInstance],
subrecipes: List["Recipe"],
price: Optional["PriceDB"],
stepsections: List[StepSection],
) -> None:
super().__init__(ctx)
self.srcpath = ""
self.outpath = ""
self.title = title
self.description = description
self.ingredients = ingredients
self.subrecipes = subrecipes
self.price = price
self.stepsections = stepsections
self.source = source
@classmethod
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
ingredients: List[IngredientInstance] = []
if "ingredients" in dct:
for ingdct in dct["ingredients"]:
ingredient = IngredientInstance.from_dict(ctx, ingdct)
ingredients.append(ingredient)
subrecipes: List[Recipe] = []
if "subrecipes" in dct:
for partdct in dct["subrecipes"]:
rp = Recipe.from_dict(ctx, partdct)
subrecipes.append(rp)
price: Optional[PriceDB] = None
pricex: float = 0
ingswithprice = 0
ingswithoutprice = 0
currency = None
for ing in ingredients:
if ing.price is None:
ingswithoutprice += 1
continue
ingswithprice += 1
pricex += ing.price.price
cur_currency = ing.price.currency
if currency is None:
currency = cur_currency
elif currency != cur_currency:
# we don't know how to convert currencies yet
currency = None
break
if currency is None or ingswithoutprice != 0 or len(ingredients) == 0:
price = None
else:
price = PriceDB(
ctx=ctx,
price=pricex,
amount=1,
unit=ctx.default_unit,
currency=currency,
)
stepsections: List[StepSection] = []
if "steps" in dct:
defaultstepsection = StepSection(ctx, "default", [])
stepsections = [defaultstepsection]
for step in dct["steps"]:
if isinstance(step, str):
defaultstepsection.steps.append(step)
if isinstance(step, dict):
section = StepSection.from_dict(ctx, step)
stepsections.append(section)
source = None
if "source" in dct:
source = SafeHTML.from_markdown(dct["source"])
description = None
if "description" in dct:
description = SafeHTML.from_markdown(dct["description"])
return cls(
ctx=ctx,
title=dct["title"],
description=description,
source=source,
ingredients=ingredients,
subrecipes=subrecipes,
price=price,
stepsections=stepsections,
)
class Conversion(Element):
def __init__(
self, ctx: Context, fromunit: Unit, tounit: Unit, ratio: float
) -> None:
super().__init__(ctx)
self.fromunit = fromunit
self.tounit = tounit
self.ratio = ratio
@classmethod
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
fromunit = ctx.units.get(dct["from"])
if fromunit is None:
raise RuntimeError(f"unit {dct['from']} doesn't exist")
tounit = ctx.units.get(dct["to"])
if tounit is None:
raise RuntimeError(f"unit {dct['to']} doesn't exist")
return cls(ctx, fromunit, tounit, dct["ratio"])
class Ingredient(Element):
def __init__(
self,
ctx: Context,
name: str,
wdid: Optional[int],
prices: List["PriceDB"],
conversions: List[Conversion],
aliases: List[str],
) -> None:
super().__init__(ctx)
self.name = name
self.wdid = wdid
self.prices = prices
self.conversions = conversions
self.aliases = aliases
@classmethod
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
wdid = None
if "wdid" in dct:
wdid = dct["wdid"]
prices: List[PriceDB] = []
if "prices" in dct:
for pricedct in dct["prices"]:
prices.append(PriceDB.from_dict(ctx, pricedct))
conversions: List[Conversion] = []
if "conversions" in dct:
for convdct in dct["conversions"]:
conversion = Conversion.from_dict(ctx, convdct)
conversions.append(conversion)
aliases: List[str] = []
if "aliases" in dct:
aliases = dct["aliases"]
return cls(
ctx,
name=dct["name"],
wdid=wdid,
prices=prices,
conversions=conversions,
aliases=aliases,
)
def dfs(
self,
conversions: Dict[str, Dict[str, float]],
startname: str,
endname: str,
visited: Optional[List[str]] = None,
) -> Optional[List[str]]:
if visited is None:
visited = []
if startname == endname:
return visited + [startname]
for nextunit in conversions[startname].keys():
if nextunit in visited:
continue
result = self.dfs(conversions, nextunit, endname, visited + [startname])
if result is not None:
return result
return None
def convert(self, amount: float, unitfrom: Unit, unitto: Unit) -> Optional[float]:
conversions: Dict[str, Dict[str, float]] = collections.defaultdict(dict)
# construct node tree
convs = self.conversions
for unit in self.ctx.units.units:
convs += unit.conversions
for conv in convs:
fromname = conv.fromunit.name
toname = conv.tounit.name
conversions[fromname][toname] = conv.ratio
conversions[toname][fromname] = 1 / conv.ratio
# find path between conversions
path = self.dfs(conversions, unitfrom.name, unitto.name)
if path is None:
self.ctx.issues.warn(
issues.ISSUE_KNOWN_PRICE_UNKNOWN_CONVERSION,
f"{self.name} has a known price, but conversion {unitfrom.name} -> {unitto.name} not known",
)
return None
assert len(path) != 0
oldelem = path[0]
for elem in path[1:]:
amount *= conversions[oldelem][elem]
oldelem = elem
return amount
def getprice(self, amount: float, unit: Unit) -> Optional["PriceDB"]:
prices: List[PriceDB] = []
pricedbs: List[PriceDB] = self.prices
for entry in pricedbs:
assert isinstance(entry, PriceDB)
entryamount: float = entry.amount
entryprice: float = entry.price
entryunit: Unit = entry.unit
price = 0.0
if entryunit == unit:
price = (amount / entryamount) * entryprice
else:
pricex = self.convert(amount, unit, entryunit)
if pricex is not None:
price = (pricex / entryamount) * entryprice
newentry = PriceDB(
self.ctx,
price=price,
amount=amount,
unit=unit,
currency=entry.currency,
)
prices.append(newentry)
if len(prices) == 0:
return None
assert len(prices) == 1
return prices[0]
class AIngredients:
def __init__(self, ctx: Context) -> None:
self.ctx = ctx
self.ingredients: List[Ingredient] = []
def load(self, lst: List[Any]) -> None:
pass
@abstractmethod
def get(self, name: str) -> Optional[Ingredient]:
...
class FakeIngredients(AIngredients):
def get(self, name: str) -> Optional[Ingredient]:
for ing in self.ingredients:
if ing.name == name:
return ing
ing = Ingredient(self.ctx, name, None, [], [], [])
self.ingredients.append(ing)
return ing
class Ingredients(AIngredients):
def load(self, lst: List[Any]) -> None:
for ingdct in lst:
ing = Ingredient.from_dict(self.ctx, ingdct)
self.ingredients.append(ing)
def get(self, name: str) -> Optional[Ingredient]:
for ing in self.ingredients:
if ing.name == name or name in ing.aliases:
return ing
return None
class PriceDB(Element):
def __init__(
self,
ctx: Context,
price: float,
amount: float,
unit: Unit,
currency: Optional[str],
) -> None:
super().__init__(ctx)
self.price = price
self.amount = amount
self.unit = unit
self.currency = currency
@classmethod
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
price = dct["price"]
amount = dct["amount"]
unitstr = dct["unit"]
unitx = ctx.units.get(unitstr)
# uses the default unit if unit is not known (this won't be
# visible to the user since rendering will stop after
# everything is evaluated)
unit = ctx.default_unit
if unitx is None:
ctx.issues.error(issues.ISSUE_UNKNOWN_UNIT, f"unknown unit {unitstr}")
else:
unit = unitx
currency = ctx.settings.default_currency
if "currency" in dct:
currency = dct["currency"]
return cls(ctx=ctx, price=price, amount=amount, unit=unit, currency=currency)