from abc import abstractmethod from collections import defaultdict import collections import sys from typing import Dict, List, Any, Optional, Self, Set import os import json import yaml import jinja2 import jsonschema import jsonschema.exceptions ISSUE_UNKNOWN_UNIT = "unknown-unit" ISSUE_UNKNOWN_INGREDIENT = "unknown-ingredient" ISSUE_DUPLICATE_UNITS = "duplicate-units" ISSUE_KNOWN_PRICE_UNKNOWN_CONVERSION = "known-price-unknown-conversion" class Issue: def __init__(self, id: str, msg: str) -> None: self.id = id self.msg = msg class Issues: def __init__(self) -> None: self.errors: List[Issue] = [] self.warnings: List[Issue] = [] def error(self, id: str, msg: str) -> None: self.errors.append(Issue(id, msg)) def warn(self, id: str, msg: str) -> None: self.warnings.append(Issue(id, msg)) def check(self) -> int: retcode = len(self.errors) != 0 for msg in self.errors: print(f"ERROR {msg.id}: {msg.msg}") for msg in self.warnings: print(f"WARNING {msg.id}: {msg.msg}") self.errors.clear() self.warnings.clear() return retcode class Context: def __init__(self) -> None: self.settings = Settings() self.units: AUnits = FakeUnits(self) self.default_unit = Unit(self, "piece", [], []) self.ingredients: AIngredients = FakeIngredients(self) self.issues = Issues() def load_units( self, unitsdct: List[Dict[str, Any]], unitsschema: Dict[str, Any] ) -> None: self.units = Units(self) self.units.units.append(self.default_unit) jsonschema.validate(instance=unitsdct, schema=unitsschema) self.units.load(unitsdct) self.units.validate() def load_ingredients( self, ingredientsdct: List[Dict[str, Any]], ingredientsschema: Dict[str, Any] ) -> None: self.ingredients = Ingredients(self) jsonschema.validate(instance=ingredientsdct, schema=ingredientsschema) self.ingredients.load(ingredientsdct) def load_settings( self, settingsdct: Dict[str, Any], settingsschema: Dict[str, Any] ) -> None: jsonschema.validate(instance=settingsdct, schema=settingsschema) self.settings.load(settingsdct) class Settings: def __init__(self) -> None: self.default_currency: Optional[str] = None def load(self, settingsdct: Dict[str, Any]) -> None: self.default_currency = settingsdct["default_currency"] class Element: def __init__(self, ctx: Context) -> None: self.ctx = ctx @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: return cls(ctx) class Conversion(Element): def __init__( self, ctx: Context, fromunit: "Unit", tounit: "Unit", ratio: float ) -> None: super().__init__(ctx) self.fromunit = fromunit self.tounit = tounit self.ratio = ratio @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: fromunit = ctx.units.get(dct["from"]) if fromunit is None: raise RuntimeError(f"unit {dct['from']} doesn't exist") tounit = ctx.units.get(dct["to"]) if tounit is None: raise RuntimeError(f"unit {dct['to']} doesn't exist") return cls(ctx, fromunit, tounit, dct["ratio"]) class Unit(Element): def __init__( self, ctx: Context, name: str, conversions: List[Conversion], aliases: List[str], dct: Optional[Dict[str, Any]] = None, ) -> None: super().__init__(ctx) self.name = name self.conversions = conversions self.aliases = aliases self.dct = dct @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: # conversions are an empty list which is populated in finish_load # which should be ran when all units are registered aliases: List[str] = [] if "aliases" in dct: aliases = dct["aliases"] return cls(ctx, name=dct["name"], conversions=[], aliases=aliases, dct=dct) def finish_load(self) -> None: if self.dct is None: return if "conversions" not in self.dct: return conversions = [] for conv in self.dct["conversions"]: if "from" in conv: raise RuntimeError( "conversions in units.yaml cannot have a from field, it is automatically assigned from the unit name" ) conv["from"] = self.dct["name"] conv = Conversion.from_dict(self.ctx, conv) conversions.append(conv) self.conversions = conversions class AUnits: def __init__(self, ctx: Context) -> None: self.ctx = ctx self.units: List[Unit] = [] def load(self, lst: List[Any]) -> None: pass @abstractmethod def get(self, name: str) -> Optional[Unit]: ... def validate(self) -> None: pass class FakeUnits(AUnits): def get(self, name: str) -> Optional[Unit]: for unit in self.units: if unit.name == name: return unit unit = Unit(self.ctx, name, [], []) self.units.append(unit) return unit class Units(AUnits): def load(self, lst: List[Any]) -> None: for unitdct in lst: unit = Unit.from_dict(self.ctx, unitdct) self.units.append(unit) for unit in self.units: unit.finish_load() def get(self, name: str) -> Optional[Unit]: for unit in self.units: if unit.name == name or name in unit.aliases: return unit return None def validate(self) -> None: unitnames = [] for unit in self.units: unitnames.append(unit.name) for unitname, num in collections.Counter(unitnames).items(): if num > 1: self.ctx.issues.error( ISSUE_DUPLICATE_UNITS, f"units.yaml: {unitname} should only have one entry, found {num}", ) class Ingredient(Element): def __init__( self, ctx: Context, name: str, wdid: Optional[int], prices: List["PriceDB"], conversions: List[Conversion], aliases: List[str], ) -> None: super().__init__(ctx) self.name = name self.wdid = wdid self.prices = prices self.conversions = conversions self.aliases = aliases @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: wdid = None if "wdid" in dct: wdid = dct["wdid"] prices: List[PriceDB] = [] if "prices" in dct: for pricedct in dct["prices"]: prices.append(PriceDB.from_dict(ctx, pricedct)) conversions: List[Conversion] = [] if "conversions" in dct: for convdct in dct["conversions"]: conversion = Conversion.from_dict(ctx, convdct) conversions.append(conversion) aliases: List[str] = [] if "aliases" in dct: aliases = dct["aliases"] return cls( ctx, name=dct["name"], wdid=wdid, prices=prices, conversions=conversions, aliases=aliases, ) def dfs( self, conversions: Dict[str, Dict[str, float]], startname: str, endname: str, visited: Optional[List[str]] = None, ) -> Optional[List[str]]: if visited is None: visited = [] if startname == endname: return visited + [startname] for nextunit in conversions[startname].keys(): if nextunit in visited: continue result = self.dfs(conversions, nextunit, endname, visited + [startname]) if result is not None: return result return None def convert(self, amount: float, unitfrom: Unit, unitto: Unit) -> Optional[float]: conversions: Dict[str, Dict[str, float]] = defaultdict(dict) # construct node tree convs = self.conversions for unit in self.ctx.units.units: convs += unit.conversions for conv in convs: fromname = conv.fromunit.name toname = conv.tounit.name conversions[fromname][toname] = conv.ratio conversions[toname][fromname] = 1 / conv.ratio # find path between conversions path = self.dfs(conversions, unitfrom.name, unitto.name) if path is None: self.ctx.issues.warn( ISSUE_KNOWN_PRICE_UNKNOWN_CONVERSION, f"{self.name} has a known price, but conversion {unitfrom.name} -> {unitto.name} not known", ) return None assert len(path) != 0 oldelem = path[0] for elem in path[1:]: amount *= conversions[oldelem][elem] oldelem = elem return amount def getprice(self, amount: float, unit: Unit) -> Optional["PriceDB"]: prices: List[PriceDB] = [] pricedbs: List[PriceDB] = self.prices for entry in pricedbs: assert isinstance(entry, PriceDB) entryamount: float = entry.amount entryprice: float = entry.price entryunit: Unit = entry.unit price = 0.0 if entryunit == unit: price = (amount / entryamount) * entryprice else: pricex = self.convert(amount, unit, entryunit) if pricex is not None: price = (pricex / entryamount) * entryprice newentry = PriceDB( self.ctx, price=price, amount=amount, unit=unit, currency=entry.currency, ) prices.append(newentry) if len(prices) == 0: return None assert len(prices) == 1 return prices[0] class AIngredients: def __init__(self, ctx: Context) -> None: self.ctx = ctx self.ingredients: List[Ingredient] = [] def load(self, lst: List[Any]) -> None: pass @abstractmethod def get(self, name: str) -> Optional[Ingredient]: ... class FakeIngredients(AIngredients): def get(self, name: str) -> Optional[Ingredient]: for ing in self.ingredients: if ing.name == name: return ing ing = Ingredient(self.ctx, name, None, [], [], []) self.ingredients.append(ing) return ing class Ingredients(AIngredients): def load(self, lst: List[Any]) -> None: for ingdct in lst: ing = Ingredient.from_dict(self.ctx, ingdct) self.ingredients.append(ing) def get(self, name: str) -> Optional[Ingredient]: for ing in self.ingredients: if ing.name == name or name in ing.aliases: return ing return None class PriceDB(Element): def __init__( self, ctx: Context, price: float, amount: float, unit: Unit, currency: Optional[str], ) -> None: super().__init__(ctx) self.price = price self.amount = amount self.unit = unit self.currency = currency @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: price = dct["price"] amount = dct["amount"] unitstr = dct["unit"] unitx = ctx.units.get(unitstr) # uses the default unit if unit is not known (this won't be # visible to the user since rendering will stop after # everything is evaluated) unit = ctx.default_unit if unitx is None: ctx.issues.error(ISSUE_UNKNOWN_UNIT, f"unknown unit {unitstr}") else: unit = unitx currency = ctx.settings.default_currency if "currency" in dct: currency = dct["currency"] return cls(ctx=ctx, price=price, amount=amount, unit=unit, currency=currency) class IngredientInstance(Element): def __init__( self, ctx: Context, name: str, amount: float, unit: Unit, alternatives: List["IngredientInstance"], note: str, price: Optional[PriceDB], ) -> None: super().__init__(ctx) self.name = name self.amount = amount self.unit = unit self.alternatives = alternatives self.note = note self.price = price @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: name = dct["name"] ingredient = ctx.ingredients.get(name) if ingredient is None: ctx.issues.error( ISSUE_UNKNOWN_INGREDIENT, f"unknown ingredient {dct['name']}" ) amount = 1.0 if "amount" in dct: amount = dct["amount"] unit = ctx.default_unit if "unit" in dct: unitstr = dct["unit"] unitx = ctx.units.get(unitstr) if unitx is None: ctx.issues.error(ISSUE_UNKNOWN_UNIT, "unknown unit {unitstr}") else: unit = unitx note = "" if "note" in dct: note = dct["note"] alternatives = [] if "or" in dct: for ingdct in dct["or"]: ing = IngredientInstance.from_dict(ctx, ingdct) alternatives.append(ing) price = None if ingredient is not None: price = ingredient.getprice(amount, unit) return cls( ctx=ctx, name=name, amount=amount, unit=unit, alternatives=alternatives, note=note, price=price, ) class Recipe(Element): def __init__( self, ctx: Context, title: str, ingredients: List[IngredientInstance], subrecipes: List["Recipe"], price: Optional[PriceDB], steps: List[str], ) -> None: super().__init__(ctx) self.srcpath = "" self.outpath = "" self.title = title self.ingredients = ingredients self.subrecipes = subrecipes self.price = price self.steps = steps @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: ingredients: List[IngredientInstance] = [] if "ingredients" in dct: for ingdct in dct["ingredients"]: ingredient = IngredientInstance.from_dict(ctx, ingdct) ingredients.append(ingredient) subrecipes: List[Recipe] = [] if "subrecipes" in dct: for partdct in dct["subrecipes"]: rp = Recipe.from_dict(ctx, partdct) subrecipes.append(rp) price: Optional[PriceDB] = None pricex: float = 0 ingswithprice = 0 ingswithoutprice = 0 currency = None for ing in ingredients: if ing.price is None: ingswithoutprice += 1 continue ingswithprice += 1 pricex += ing.price.price cur_currency = ing.price.currency if currency is None: currency = cur_currency elif currency != cur_currency: # we don't know how to convert currencies yet currency = None break if currency is None or ingswithoutprice != 0 or len(ingredients) == 0: price = None else: price = PriceDB( ctx=ctx, price=pricex, amount=1, unit=ctx.default_unit, currency=currency, ) steps = [] if "steps" in dct: steps = dct["steps"] return cls( ctx=ctx, title=dct["title"], ingredients=ingredients, subrecipes=subrecipes, price=price, steps=steps, ) class Builder: def __init__(self) -> None: self.jinjaenv = jinja2.Environment( loader=jinja2.FileSystemLoader("templates"), autoescape=jinja2.select_autoescape(), ) def numprint(input: int) -> str: out = str(input) if out.endswith(".0"): return out.split(".", maxsplit=1)[0] return out def amountprint(input: int) -> str: out = numprint(input) if out == "0.5": return "1/2" if out == "0.25": return "1/4" if out == "0.75": return "3/4" return out self.jinjaenv.filters["numprint"] = numprint self.jinjaenv.filters["amountprint"] = amountprint self.ctx = Context() # list of output files that will be built self.outfiles: Set[str] = set() def load_file(self, file: str) -> Any: print(f"loading {file}") with open(file, encoding="utf-8") as f: txt = f.read() if file.endswith(".json"): return json.loads(txt) return yaml.safe_load(txt) def rendertemplate( self, templatepath: str, format: str, file: str, dir: str, args: Any ) -> None: template = self.jinjaenv.get_template(templatepath) print(f"rendering {file}") outstr = template.render(args) os.makedirs(f"{dir}/out/{format}", exist_ok=True) with open(f"{dir}/out/{format}/{file}", "w", encoding="utf-8") as f: f.write(outstr) self.outfiles.add(file) def load(self, dir: str) -> int: if os.path.isfile(dir + "/settings.yaml"): settingsdct = self.load_file(dir + "/settings.yaml") settingsschema = self.load_file("schemas/settings.json") self.ctx.load_settings(settingsdct, settingsschema) retcode = self.ctx.issues.check() if retcode != 0: return 1 if os.path.isfile(dir + "/units.yaml"): unitsschema = self.load_file("schemas/units.json") unitsdct = self.load_file(dir + "/units.yaml") self.ctx.load_units(unitsdct, unitsschema) retcode = self.ctx.issues.check() if retcode != 0: return 1 if os.path.isfile(dir + "/ingredients.yaml"): ingredientsdct = self.load_file(dir + "/ingredients.yaml") ingredientsschema = self.load_file("schemas/ingredients.json") self.ctx.load_ingredients(ingredientsdct, ingredientsschema) retcode = self.ctx.issues.check() if retcode != 0: return 1 return 0 def run(self, dir: str) -> int: files = [] for _, _, filesx in os.walk(dir + "/recipes"): files = filesx files.sort() recipes: List[Recipe] = [] recipeschema = self.load_file("schemas/recipe.json") for file in files: if not file.endswith(".yaml"): print(f"unknown extension of {file}") continue recipedct = self.load_file(dir + "/recipes/" + file) jsonschema.validate(instance=recipedct, schema=recipeschema) recipe = Recipe.from_dict(self.ctx, recipedct) recipe.srcpath = file recipe.outpath = file[:-5] + ".html" if self.ctx.issues.check() != 0: continue recipes.append(recipe) retcode = self.ctx.issues.check() if retcode != 0: return 1 self.rendertemplate( templatepath="index.html", format="html", file="index.html", dir=dir, args={"recipes": recipes}, ) for recipe in recipes: self.rendertemplate( templatepath="recipe.html", format="html", file=recipe.outpath, dir=dir, args={"recipe": recipe}, ) return 0 def finish(self, dir: str) -> int: files = set() for _, _, filesx in os.walk(f"{dir}/out/html"): files = set(filesx) # files we did not generate, probably left by a previous run, but not valid anymore extra_files = files - self.outfiles for file in extra_files: print(f"removing obsolete {file}") os.remove(f"{dir}/out/html/{file}") return 0 def build(self, path: str) -> int: fcs = [self.load, self.run, self.finish] for func in fcs: try: ret = func(path) if ret != 0: return ret except jsonschema.exceptions.ValidationError as e: print("ERROR:", e) return 1 return 0 def help() -> None: print(f"usage: {sys.argv[0]} build DIR - build pages in DIR/out") print(f" {sys.argv[0]} -h - show help") def main() -> None: if len(sys.argv) == 2 and sys.argv[1] == "-h": help() sys.exit(0) elif len(sys.argv) == 3 and sys.argv[1] == "build": ret = Builder().build(sys.argv[2]) sys.exit(ret) else: help() sys.exit(1) if __name__ == "__main__": main()