from abc import abstractmethod import collections from typing import Any, Dict, List, Optional, Self import comfyrecipes.settings as settings import comfyrecipes.issues as issues import jsonschema class Context: def __init__(self) -> None: self.settings = settings.Settings() self.units: AUnits = FakeUnits(self) self.default_unit = Unit(self, "piece", [], []) self.ingredients: AIngredients = FakeIngredients(self) self.issues = issues.Issues() def load_units( self, unitsdct: List[Dict[str, Any]], unitsschema: Dict[str, Any] ) -> None: self.units = Units(self) self.units.units.append(self.default_unit) jsonschema.validate(instance=unitsdct, schema=unitsschema) self.units.load(unitsdct) self.units.validate() def load_ingredients( self, ingredientsdct: List[Dict[str, Any]], ingredientsschema: Dict[str, Any] ) -> None: self.ingredients = Ingredients(self) jsonschema.validate(instance=ingredientsdct, schema=ingredientsschema) self.ingredients.load(ingredientsdct) def load_settings( self, settingsdct: Dict[str, Any], settingsschema: Dict[str, Any] ) -> None: jsonschema.validate(instance=settingsdct, schema=settingsschema) self.settings.load(settingsdct) class Element: def __init__(self, ctx: Context) -> None: self.ctx = ctx @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: return cls(ctx) class Unit(Element): def __init__( self, ctx: Context, name: str, conversions: List["Conversion"], aliases: List[str], dct: Optional[Dict[str, Any]] = None, ) -> None: super().__init__(ctx) self.name = name self.conversions = conversions self.aliases = aliases self.dct = dct @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: # conversions are an empty list which is populated in finish_load # which should be ran when all units are registered aliases: List[str] = [] if "aliases" in dct: aliases = dct["aliases"] return cls(ctx, name=dct["name"], conversions=[], aliases=aliases, dct=dct) def finish_load(self) -> None: if self.dct is None: return if "conversions" not in self.dct: return conversions = [] for conv in self.dct["conversions"]: if "from" in conv: raise RuntimeError( "conversions in units.yaml cannot have a from field, it is automatically assigned from the unit name" ) conv["from"] = self.dct["name"] conv = Conversion.from_dict(self.ctx, conv) conversions.append(conv) self.conversions = conversions class AUnits: def __init__(self, ctx: Context) -> None: self.ctx = ctx self.units: List[Unit] = [] def load(self, lst: List[Any]) -> None: pass @abstractmethod def get(self, name: str) -> Optional[Unit]: ... def validate(self) -> None: pass class FakeUnits(AUnits): def get(self, name: str) -> Optional[Unit]: for unit in self.units: if unit.name == name: return unit unit = Unit(self.ctx, name, [], []) self.units.append(unit) return unit class Units(AUnits): def load(self, lst: List[Any]) -> None: for unitdct in lst: unit = Unit.from_dict(self.ctx, unitdct) self.units.append(unit) for unit in self.units: unit.finish_load() def get(self, name: str) -> Optional[Unit]: for unit in self.units: if unit.name == name or name in unit.aliases: return unit return None def validate(self) -> None: unitnames = [] for unit in self.units: unitnames.append(unit.name) for unitname, num in collections.Counter(unitnames).items(): if num > 1: self.ctx.issues.error( issues.ISSUE_DUPLICATE_UNITS, f"units.yaml: {unitname} should only have one entry, found {num}", ) class IngredientInstance(Element): def __init__( self, ctx: Context, name: str, amount: float, unit: Unit, alternatives: List["IngredientInstance"], note: str, price: Optional["PriceDB"], ) -> None: super().__init__(ctx) self.name = name self.amount = amount self.unit = unit self.alternatives = alternatives self.note = note self.price = price @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: name = dct["name"] ingredient = ctx.ingredients.get(name) if ingredient is None: ctx.issues.error( issues.ISSUE_UNKNOWN_INGREDIENT, f"unknown ingredient {dct['name']}" ) amount = 1.0 if "amount" in dct: amount = dct["amount"] unit = ctx.default_unit if "unit" in dct: unitstr = dct["unit"] unitx = ctx.units.get(unitstr) if unitx is None: ctx.issues.error(issues.ISSUE_UNKNOWN_UNIT, "unknown unit {unitstr}") else: unit = unitx note = "" if "note" in dct: note = dct["note"] alternatives = [] if "or" in dct: for ingdct in dct["or"]: ing = IngredientInstance.from_dict(ctx, ingdct) alternatives.append(ing) price = None if ingredient is not None: price = ingredient.getprice(amount, unit) return cls( ctx=ctx, name=name, amount=amount, unit=unit, alternatives=alternatives, note=note, price=price, ) class StepSection(Element): def __init__(self, ctx: Context, title: str, steps: List[str]) -> None: self.title = title self.steps = steps @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: return cls(ctx, dct["section"], dct["steps"]) class Recipe(Element): def __init__( self, ctx: Context, title: str, ingredients: List[IngredientInstance], subrecipes: List["Recipe"], price: Optional["PriceDB"], stepsections: List[StepSection], ) -> None: super().__init__(ctx) self.srcpath = "" self.outpath = "" self.title = title self.ingredients = ingredients self.subrecipes = subrecipes self.price = price self.stepsections = stepsections @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: ingredients: List[IngredientInstance] = [] if "ingredients" in dct: for ingdct in dct["ingredients"]: ingredient = IngredientInstance.from_dict(ctx, ingdct) ingredients.append(ingredient) subrecipes: List[Recipe] = [] if "subrecipes" in dct: for partdct in dct["subrecipes"]: rp = Recipe.from_dict(ctx, partdct) subrecipes.append(rp) price: Optional[PriceDB] = None pricex: float = 0 ingswithprice = 0 ingswithoutprice = 0 currency = None for ing in ingredients: if ing.price is None: ingswithoutprice += 1 continue ingswithprice += 1 pricex += ing.price.price cur_currency = ing.price.currency if currency is None: currency = cur_currency elif currency != cur_currency: # we don't know how to convert currencies yet currency = None break if currency is None or ingswithoutprice != 0 or len(ingredients) == 0: price = None else: price = PriceDB( ctx=ctx, price=pricex, amount=1, unit=ctx.default_unit, currency=currency, ) stepsections: List[StepSection] = [] if "steps" in dct: defaultstepsection = StepSection(ctx, "default", []) stepsections = [defaultstepsection] for step in dct["steps"]: if isinstance(step, str): defaultstepsection.steps.append(step) if isinstance(step, dict): section = StepSection.from_dict(ctx, step) stepsections.append(section) return cls( ctx=ctx, title=dct["title"], ingredients=ingredients, subrecipes=subrecipes, price=price, stepsections=stepsections, ) class Conversion(Element): def __init__( self, ctx: Context, fromunit: Unit, tounit: Unit, ratio: float ) -> None: super().__init__(ctx) self.fromunit = fromunit self.tounit = tounit self.ratio = ratio @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: fromunit = ctx.units.get(dct["from"]) if fromunit is None: raise RuntimeError(f"unit {dct['from']} doesn't exist") tounit = ctx.units.get(dct["to"]) if tounit is None: raise RuntimeError(f"unit {dct['to']} doesn't exist") return cls(ctx, fromunit, tounit, dct["ratio"]) class Ingredient(Element): def __init__( self, ctx: Context, name: str, wdid: Optional[int], prices: List["PriceDB"], conversions: List[Conversion], aliases: List[str], ) -> None: super().__init__(ctx) self.name = name self.wdid = wdid self.prices = prices self.conversions = conversions self.aliases = aliases @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: wdid = None if "wdid" in dct: wdid = dct["wdid"] prices: List[PriceDB] = [] if "prices" in dct: for pricedct in dct["prices"]: prices.append(PriceDB.from_dict(ctx, pricedct)) conversions: List[Conversion] = [] if "conversions" in dct: for convdct in dct["conversions"]: conversion = Conversion.from_dict(ctx, convdct) conversions.append(conversion) aliases: List[str] = [] if "aliases" in dct: aliases = dct["aliases"] return cls( ctx, name=dct["name"], wdid=wdid, prices=prices, conversions=conversions, aliases=aliases, ) def dfs( self, conversions: Dict[str, Dict[str, float]], startname: str, endname: str, visited: Optional[List[str]] = None, ) -> Optional[List[str]]: if visited is None: visited = [] if startname == endname: return visited + [startname] for nextunit in conversions[startname].keys(): if nextunit in visited: continue result = self.dfs(conversions, nextunit, endname, visited + [startname]) if result is not None: return result return None def convert(self, amount: float, unitfrom: Unit, unitto: Unit) -> Optional[float]: conversions: Dict[str, Dict[str, float]] = collections.defaultdict(dict) # construct node tree convs = self.conversions for unit in self.ctx.units.units: convs += unit.conversions for conv in convs: fromname = conv.fromunit.name toname = conv.tounit.name conversions[fromname][toname] = conv.ratio conversions[toname][fromname] = 1 / conv.ratio # find path between conversions path = self.dfs(conversions, unitfrom.name, unitto.name) if path is None: self.ctx.issues.warn( issues.ISSUE_KNOWN_PRICE_UNKNOWN_CONVERSION, f"{self.name} has a known price, but conversion {unitfrom.name} -> {unitto.name} not known", ) return None assert len(path) != 0 oldelem = path[0] for elem in path[1:]: amount *= conversions[oldelem][elem] oldelem = elem return amount def getprice(self, amount: float, unit: Unit) -> Optional["PriceDB"]: prices: List[PriceDB] = [] pricedbs: List[PriceDB] = self.prices for entry in pricedbs: assert isinstance(entry, PriceDB) entryamount: float = entry.amount entryprice: float = entry.price entryunit: Unit = entry.unit price = 0.0 if entryunit == unit: price = (amount / entryamount) * entryprice else: pricex = self.convert(amount, unit, entryunit) if pricex is not None: price = (pricex / entryamount) * entryprice newentry = PriceDB( self.ctx, price=price, amount=amount, unit=unit, currency=entry.currency, ) prices.append(newentry) if len(prices) == 0: return None assert len(prices) == 1 return prices[0] class AIngredients: def __init__(self, ctx: Context) -> None: self.ctx = ctx self.ingredients: List[Ingredient] = [] def load(self, lst: List[Any]) -> None: pass @abstractmethod def get(self, name: str) -> Optional[Ingredient]: ... class FakeIngredients(AIngredients): def get(self, name: str) -> Optional[Ingredient]: for ing in self.ingredients: if ing.name == name: return ing ing = Ingredient(self.ctx, name, None, [], [], []) self.ingredients.append(ing) return ing class Ingredients(AIngredients): def load(self, lst: List[Any]) -> None: for ingdct in lst: ing = Ingredient.from_dict(self.ctx, ingdct) self.ingredients.append(ing) def get(self, name: str) -> Optional[Ingredient]: for ing in self.ingredients: if ing.name == name or name in ing.aliases: return ing return None class PriceDB(Element): def __init__( self, ctx: Context, price: float, amount: float, unit: Unit, currency: Optional[str], ) -> None: super().__init__(ctx) self.price = price self.amount = amount self.unit = unit self.currency = currency @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: price = dct["price"] amount = dct["amount"] unitstr = dct["unit"] unitx = ctx.units.get(unitstr) # uses the default unit if unit is not known (this won't be # visible to the user since rendering will stop after # everything is evaluated) unit = ctx.default_unit if unitx is None: ctx.issues.error(issues.ISSUE_UNKNOWN_UNIT, f"unknown unit {unitstr}") else: unit = unitx currency = ctx.settings.default_currency if "currency" in dct: currency = dct["currency"] return cls(ctx=ctx, price=price, amount=amount, unit=unit, currency=currency)