from abc import abstractmethod import collections import re from typing import Any, Dict, List, Optional, Self import comfyrecipes.settings as settings import comfyrecipes.issues as issues import jsonschema import lxml.html.clean import mistune class SafeHTML: def __init__(self, clean_html: str) -> None: self.html = clean_html @classmethod def from_html(cls, dirty_html: str) -> Self: cleaner = lxml.html.clean.Cleaner( allow_tags=["a", "b", "em", "i", "strong"], safe_attrs_only=True, safe_attrs=["href"], ) html = cleaner.clean_html(dirty_html).strip() assert isinstance(html, str) assert html.startswith("
") assert html.endswith("
") html = html[5:-6] return cls(html) @classmethod def from_markdown(cls, markdowntxt: str) -> Self: dirty_html = mistune.html(markdowntxt) assert isinstance(dirty_html, str) return cls.from_html(dirty_html) class Context: def __init__(self) -> None: self.settings = settings.Settings() self.units: AUnits = FakeUnits(self) self.default_unit = Unit(self, "piece", [], []) self.ingredients: AIngredients = FakeIngredients(self) self.issues = issues.Issues() def load_units( self, unitsdct: List[Dict[str, Any]], unitsschema: Dict[str, Any] ) -> None: self.units = Units(self) self.units.units.append(self.default_unit) jsonschema.validate(instance=unitsdct, schema=unitsschema) self.units.load(unitsdct) self.units.validate() def load_ingredients( self, ingredientsdct: List[Dict[str, Any]], ingredientsschema: Dict[str, Any] ) -> None: self.ingredients = Ingredients(self) jsonschema.validate(instance=ingredientsdct, schema=ingredientsschema) self.ingredients.load(ingredientsdct) def load_settings( self, settingsdct: Dict[str, Any], settingsschema: Dict[str, Any] ) -> None: jsonschema.validate(instance=settingsdct, schema=settingsschema) self.settings.load(settingsdct) class Element: def __init__(self, ctx: Context) -> None: self.ctx = ctx @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: return cls(ctx) class Unit(Element): def __init__( self, ctx: Context, name: str, conversions: List["Conversion"], aliases: List[str], dct: Optional[Dict[str, Any]] = None, ) -> None: super().__init__(ctx) self.name = name self.conversions = conversions self.aliases = aliases self.dct = dct @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: # conversions are an empty list which is populated in finish_load # which should be ran when all units are registered aliases: List[str] = [] if "aliases" in dct: aliases = dct["aliases"] return cls(ctx, name=dct["name"], conversions=[], aliases=aliases, dct=dct) def finish_load(self) -> None: if self.dct is None: return if "conversions" not in self.dct: return conversions = [] for conv in self.dct["conversions"]: if "from" in conv: raise RuntimeError( "conversions in units.yaml cannot have a from field, it is automatically assigned from the unit name" ) conv["from"] = self.dct["name"] conv = Conversion.from_dict(self.ctx, conv) conversions.append(conv) self.conversions = conversions class AUnits: def __init__(self, ctx: Context) -> None: self.ctx = ctx self.units: List[Unit] = [] def load(self, lst: List[Any]) -> None: pass @abstractmethod def get(self, name: str) -> Optional[Unit]: ... def validate(self) -> None: pass class FakeUnits(AUnits): def get(self, name: str) -> Optional[Unit]: for unit in self.units: if unit.name == name: return unit unit = Unit(self.ctx, name, [], []) self.units.append(unit) return unit class Units(AUnits): def load(self, lst: List[Any]) -> None: for unitdct in lst: unit = Unit.from_dict(self.ctx, unitdct) self.units.append(unit) for unit in self.units: unit.finish_load() def get(self, name: str) -> Optional[Unit]: for unit in self.units: if unit.name == name or name in unit.aliases: return unit return None def validate(self) -> None: unitnames = [] for unit in self.units: unitnames.append(unit.name) for unitname, num in collections.Counter(unitnames).items(): if num > 1: self.ctx.issues.error( issues.ISSUE_DUPLICATE_UNITS, f"units.yaml: {unitname} should only have one entry, found {num}", ) class IngredientInstance(Element): def __init__( self, ctx: Context, name: str, amount: Optional[float], unit: Optional[Unit], alternatives: List["IngredientInstance"], note: str, price: Optional["PriceDB"], ) -> None: super().__init__(ctx) self.name = name self.amount = amount self.unit = unit self.alternatives = alternatives self.note = note self.price = price @classmethod def from_dict(cls, ctx: Context, dct: str | Dict[str, Any]) -> Self: if isinstance(dct, str): string = dct.strip() p = re.compile(r"^(?:([0-9\.]+) ([a-zA-Z]+) )?([\w ]+)(?: \((.*)\))?$") match = p.match(string) if match is None: raise RuntimeError( "ingredient {string} regex not matched, it should be in the format [amount(num) unit(string, one word)] name(string, any number of words) [(note(string))]" ) amount = float(match.group(1)) unitstr = match.group(2) unit = ctx.default_unit if unit is not None: unitx = ctx.units.get(unitstr) if unitx is None: ctx.issues.error(issues.ISSUE_UNKNOWN_UNIT, "unknown unit {unitstr}") else: unit = unitx name = match.group(3) note = match.group(4) if note is None: note = "" return cls( ctx=ctx, name=name, amount=amount, unit=unit, alternatives=[], note=note, price=None, ) name = dct["name"] ingredient = ctx.ingredients.get(name) if ingredient is None: ctx.issues.error( issues.ISSUE_UNKNOWN_INGREDIENT, f"unknown ingredient {dct['name']}" ) amount = 1.0 if "amount" in dct: amount = dct["amount"] unit = ctx.default_unit if "unit" in dct: unitstr = dct["unit"] unitx = ctx.units.get(unitstr) if unitx is None: ctx.issues.error(issues.ISSUE_UNKNOWN_UNIT, "unknown unit {unitstr}") else: unit = unitx note = "" if "note" in dct: note = dct["note"] alternatives = [] if "or" in dct: for ingdct in dct["or"]: ing = IngredientInstance.from_dict(ctx, ingdct) alternatives.append(ing) price = None if ingredient is not None: price = ingredient.getprice(amount, unit) return cls( ctx=ctx, name=name, amount=amount, unit=unit, alternatives=alternatives, note=note, price=price, ) class StepSection(Element): def __init__(self, ctx: Context, title: str, steps: List[str]) -> None: self.title = title self.steps = steps @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: return cls(ctx, dct["section"], dct["steps"]) class Recipe(Element): def __init__( self, ctx: Context, title: str, description: Optional[SafeHTML], source: Optional[SafeHTML], ingredients: List[IngredientInstance], subrecipes: List["Recipe"], price: Optional["PriceDB"], stepsections: List[StepSection], ) -> None: super().__init__(ctx) self.srcpath = "" self.outpath = "" self.title = title self.description = description self.ingredients = ingredients self.subrecipes = subrecipes self.price = price self.stepsections = stepsections self.source = source @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: ingredients: List[IngredientInstance] = [] if "ingredients" in dct: for ingdct in dct["ingredients"]: ingredient = IngredientInstance.from_dict(ctx, ingdct) ingredients.append(ingredient) subrecipes: List[Recipe] = [] if "subrecipes" in dct: for partdct in dct["subrecipes"]: rp = Recipe.from_dict(ctx, partdct) subrecipes.append(rp) price: Optional[PriceDB] = None pricex: float = 0 ingswithprice = 0 ingswithoutprice = 0 currency = None for ing in ingredients: if ing.price is None: ingswithoutprice += 1 continue ingswithprice += 1 pricex += ing.price.price cur_currency = ing.price.currency if currency is None: currency = cur_currency elif currency != cur_currency: # we don't know how to convert currencies yet currency = None break if currency is None or ingswithoutprice != 0 or len(ingredients) == 0: price = None else: price = PriceDB( ctx=ctx, price=pricex, amount=1, unit=ctx.default_unit, currency=currency, ) stepsections: List[StepSection] = [] if "steps" in dct: defaultstepsection = StepSection(ctx, "default", []) stepsections = [defaultstepsection] for step in dct["steps"]: if isinstance(step, str): defaultstepsection.steps.append(step) if isinstance(step, dict): section = StepSection.from_dict(ctx, step) stepsections.append(section) source = None if "source" in dct: source = SafeHTML.from_markdown(dct["source"]) description = None if "description" in dct: description = SafeHTML.from_markdown(dct["description"]) return cls( ctx=ctx, title=dct["title"], description=description, source=source, ingredients=ingredients, subrecipes=subrecipes, price=price, stepsections=stepsections, ) class Conversion(Element): def __init__( self, ctx: Context, fromunit: Unit, tounit: Unit, ratio: float ) -> None: super().__init__(ctx) self.fromunit = fromunit self.tounit = tounit self.ratio = ratio @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: fromunit = ctx.units.get(dct["from"]) if fromunit is None: raise RuntimeError(f"unit {dct['from']} doesn't exist") tounit = ctx.units.get(dct["to"]) if tounit is None: raise RuntimeError(f"unit {dct['to']} doesn't exist") return cls(ctx, fromunit, tounit, dct["ratio"]) class Ingredient(Element): def __init__( self, ctx: Context, name: str, wdid: Optional[int], prices: List["PriceDB"], conversions: List[Conversion], aliases: List[str], ) -> None: super().__init__(ctx) self.name = name self.wdid = wdid self.prices = prices self.conversions = conversions self.aliases = aliases @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: wdid = None if "wdid" in dct: wdid = dct["wdid"] prices: List[PriceDB] = [] if "prices" in dct: for pricedct in dct["prices"]: prices.append(PriceDB.from_dict(ctx, pricedct)) conversions: List[Conversion] = [] if "conversions" in dct: for convdct in dct["conversions"]: conversion = Conversion.from_dict(ctx, convdct) conversions.append(conversion) aliases: List[str] = [] if "aliases" in dct: aliases = dct["aliases"] return cls( ctx, name=dct["name"], wdid=wdid, prices=prices, conversions=conversions, aliases=aliases, ) def dfs( self, conversions: Dict[str, Dict[str, float]], startname: str, endname: str, visited: Optional[List[str]] = None, ) -> Optional[List[str]]: if visited is None: visited = [] if startname == endname: return visited + [startname] for nextunit in conversions[startname].keys(): if nextunit in visited: continue result = self.dfs(conversions, nextunit, endname, visited + [startname]) if result is not None: return result return None def convert(self, amount: float, unitfrom: Unit, unitto: Unit) -> Optional[float]: conversions: Dict[str, Dict[str, float]] = collections.defaultdict(dict) # construct node tree convs = self.conversions for unit in self.ctx.units.units: convs += unit.conversions for conv in convs: fromname = conv.fromunit.name toname = conv.tounit.name conversions[fromname][toname] = conv.ratio conversions[toname][fromname] = 1 / conv.ratio # find path between conversions path = self.dfs(conversions, unitfrom.name, unitto.name) if path is None: self.ctx.issues.warn( issues.ISSUE_KNOWN_PRICE_UNKNOWN_CONVERSION, f"{self.name} has a known price, but conversion {unitfrom.name} -> {unitto.name} not known", ) return None assert len(path) != 0 oldelem = path[0] for elem in path[1:]: amount *= conversions[oldelem][elem] oldelem = elem return amount def getprice(self, amount: float, unit: Unit) -> Optional["PriceDB"]: prices: List[PriceDB] = [] pricedbs: List[PriceDB] = self.prices for entry in pricedbs: assert isinstance(entry, PriceDB) entryamount: float = entry.amount entryprice: float = entry.price entryunit: Unit = entry.unit price = 0.0 if entryunit == unit: price = (amount / entryamount) * entryprice else: pricex = self.convert(amount, unit, entryunit) if pricex is not None: price = (pricex / entryamount) * entryprice newentry = PriceDB( self.ctx, price=price, amount=amount, unit=unit, currency=entry.currency, ) prices.append(newentry) if len(prices) == 0: return None assert len(prices) == 1 return prices[0] class AIngredients: def __init__(self, ctx: Context) -> None: self.ctx = ctx self.ingredients: List[Ingredient] = [] def load(self, lst: List[Any]) -> None: pass @abstractmethod def get(self, name: str) -> Optional[Ingredient]: ... class FakeIngredients(AIngredients): def get(self, name: str) -> Optional[Ingredient]: for ing in self.ingredients: if ing.name == name: return ing ing = Ingredient(self.ctx, name, None, [], [], []) self.ingredients.append(ing) return ing class Ingredients(AIngredients): def load(self, lst: List[Any]) -> None: for ingdct in lst: ing = Ingredient.from_dict(self.ctx, ingdct) self.ingredients.append(ing) def get(self, name: str) -> Optional[Ingredient]: for ing in self.ingredients: if ing.name == name or name in ing.aliases: return ing return None class PriceDB(Element): def __init__( self, ctx: Context, price: float, amount: float, unit: Unit, currency: str, ) -> None: super().__init__(ctx) self.price = price self.amount = amount self.unit = unit self.currency = currency @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: price = dct["price"] amount = dct["amount"] unitstr = dct["unit"] unitx = ctx.units.get(unitstr) # uses the default unit if unit is not known (this won't be # visible to the user since rendering will stop after # everything is evaluated) unit = ctx.default_unit if unitx is None: ctx.issues.error(issues.ISSUE_UNKNOWN_UNIT, f"unknown unit {unitstr}") else: unit = unitx currency = ctx.settings.default_currency if "currency" in dct: currency = dct["currency"] if currency is None: raise RuntimeError("currency not specified and default_currency is also not set") return cls(ctx=ctx, price=price, amount=amount, unit=unit, currency=currency)