from abc import abstractmethod import collections import sys from typing import Dict, List, Any, Optional, Self, Set import os import json import yaml import jinja2 import jsonschema import jsonschema.exceptions ISSUE_UNKNOWN_UNIT = "unknown-unit" ISSUE_UNKNOWN_INGREDIENT = "unknown-ingredient" ISSUE_DUPLICATE_UNITS = "duplicate-units" ISSUE_KNOWN_PRICE_UNKNOWN_CONVERSION = "known-price-unknown-conversion" class Issue: def __init__(self, id: str, msg: str) -> None: self.id = id self.msg = msg class Issues: def __init__(self) -> None: self.errors: List[Issue] = [] self.warnings: List[Issue] = [] def error(self, id: str, msg: str) -> None: self.errors.append(Issue(id, msg)) def warn(self, id: str, msg: str) -> None: self.warnings.append(Issue(id, msg)) def check(self) -> int: retcode = len(self.errors) != 0 for msg in self.errors: print(f"ERROR {msg.id}: {msg.msg}") for msg in self.warnings: print(f"WARNING {msg.id}: {msg.msg}") self.errors.clear() self.warnings.clear() return retcode class Context: def __init__(self) -> None: self.settings = Settings() self.units: AUnits = FakeUnits(self) self.default_unit = Unit(self, "piece", [], []) self.ingredients: AIngredients = FakeIngredients(self) self.issues = Issues() def load_units( self, unitsdct: List[Dict[str, Any]], unitsschema: Dict[str, Any] ) -> None: self.units = Units(self) self.units.units.append(self.default_unit) jsonschema.validate(instance=unitsdct, schema=unitsschema) self.units.load(unitsdct) self.units.validate() def load_ingredients( self, ingredientsdct: List[Dict[str, Any]], ingredientsschema: Dict[str, Any] ) -> None: self.ingredients = Ingredients(self) jsonschema.validate(instance=ingredientsdct, schema=ingredientsschema) self.ingredients.load(ingredientsdct) def load_settings( self, settingsdct: Dict[str, Any], settingsschema: Dict[str, Any] ) -> None: jsonschema.validate(instance=settingsdct, schema=settingsschema) self.settings.load(settingsdct) class Settings: def __init__(self) -> None: self.default_currency: Optional[str] = None def load(self, settingsdct: Dict[str, Any]) -> None: self.default_currency = settingsdct["default_currency"] class Element: def __init__(self, ctx: Context) -> None: self.ctx = ctx @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: return cls(ctx) class Conversion(Element): def __init__( self, ctx: Context, fromunit: "Unit", tounit: "Unit", ratio: float ) -> None: super().__init__(ctx) self.fromunit = fromunit self.tounit = tounit self.ratio = ratio @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: fromunit = ctx.units.get(dct["from"]) if fromunit is None: raise RuntimeError(f"unit {dct['from']} doesn't exist") tounit = ctx.units.get(dct["to"]) if tounit is None: raise RuntimeError(f"unit {dct['to']} doesn't exist") return cls(ctx, fromunit, tounit, dct["ratio"]) class Unit(Element): def __init__( self, ctx: Context, name: str, conversions: List[Conversion], aliases: List[str], dct: Optional[Dict[str, Any]] = None, ) -> None: super().__init__(ctx) self.name = name self.conversions = conversions self.aliases = aliases self.dct = dct @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: # conversions are an empty list which is populated in finish_load # which should be ran when all units are registered aliases: List[str] = [] if "aliases" in dct: aliases = dct["aliases"] return cls(ctx, name=dct["name"], conversions=[], aliases=aliases, dct=dct) def finish_load(self) -> None: if self.dct is None: return if "conversions" not in self.dct: return conversions = [] for conv in self.dct["conversions"]: if "from" in conv: raise RuntimeError( "conversions in units.yaml cannot have a from field, it is automatically assigned from the unit name" ) conv["from"] = self.dct["name"] conv = Conversion.from_dict(self.ctx, conv) conversions.append(conv) self.conversions = conversions class AUnits: def __init__(self, ctx: Context) -> None: self.ctx = ctx self.units: List[Unit] = [] def load(self, lst: List[Any]) -> None: pass @abstractmethod def get(self, name: str) -> Optional[Unit]: ... def validate(self) -> None: pass class FakeUnits(AUnits): def get(self, name: str) -> Optional[Unit]: for unit in self.units: if unit.name == name: return unit unit = Unit(self.ctx, name, [], []) self.units.append(unit) return unit class Units(AUnits): def load(self, lst: List[Any]) -> None: for unitdct in lst: unit = Unit.from_dict(self.ctx, unitdct) self.units.append(unit) for unit in self.units: unit.finish_load() def get(self, name: str) -> Optional[Unit]: for unit in self.units: if unit.name == name or name in unit.aliases: return unit return None def validate(self) -> None: unitnames = [] for unit in self.units: unitnames.append(unit.name) for unitname, num in collections.Counter(unitnames).items(): if num > 1: self.ctx.issues.error( ISSUE_DUPLICATE_UNITS, f"units.yaml: {unitname} should only have one entry, found {num}", ) class Ingredient(Element): def __init__( self, ctx: Context, name: str, wdid: Optional[int], prices: List["PriceDB"], conversions: List[Conversion], aliases: List[str], ) -> None: super().__init__(ctx) self.name = name self.wdid = wdid self.prices = prices self.conversions = conversions self.aliases = aliases @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: wdid = None if "wdid" in dct: wdid = dct["wdid"] prices: List[PriceDB] = [] if "prices" in dct: for pricedct in dct["prices"]: prices.append(PriceDB.from_dict(ctx, pricedct)) conversions: List[Conversion] = [] if "conversions" in dct: for convdct in dct["conversions"]: conversion = Conversion.from_dict(ctx, convdct) conversions.append(conversion) aliases: List[str] = [] if "aliases" in dct: aliases = dct["aliases"] return cls( ctx, name=dct["name"], wdid=wdid, prices=prices, conversions=conversions, aliases=aliases, ) def dfs( self, conversions: Dict[str, Dict[str, float]], startname: str, endname: str, visited: Optional[List[str]] = None, ) -> Optional[List[str]]: if visited is None: visited = [] if startname == endname: return visited + [startname] for nextunit in conversions[startname].keys(): if nextunit in visited: continue result = self.dfs(conversions, nextunit, endname, visited + [startname]) if result is not None: return result return None def convert(self, amount: float, unitfrom: Unit, unitto: Unit) -> Optional[float]: conversions: Dict[str, Dict[str, float]] = collections.defaultdict(dict) # construct node tree convs = self.conversions for unit in self.ctx.units.units: convs += unit.conversions for conv in convs: fromname = conv.fromunit.name toname = conv.tounit.name conversions[fromname][toname] = conv.ratio conversions[toname][fromname] = 1 / conv.ratio # find path between conversions path = self.dfs(conversions, unitfrom.name, unitto.name) if path is None: self.ctx.issues.warn( ISSUE_KNOWN_PRICE_UNKNOWN_CONVERSION, f"{self.name} has a known price, but conversion {unitfrom.name} -> {unitto.name} not known", ) return None assert len(path) != 0 oldelem = path[0] for elem in path[1:]: amount *= conversions[oldelem][elem] oldelem = elem return amount def getprice(self, amount: float, unit: Unit) -> Optional["PriceDB"]: prices: List[PriceDB] = [] pricedbs: List[PriceDB] = self.prices for entry in pricedbs: assert isinstance(entry, PriceDB) entryamount: float = entry.amount entryprice: float = entry.price entryunit: Unit = entry.unit price = 0.0 if entryunit == unit: price = (amount / entryamount) * entryprice else: pricex = self.convert(amount, unit, entryunit) if pricex is not None: price = (pricex / entryamount) * entryprice newentry = PriceDB( self.ctx, price=price, amount=amount, unit=unit, currency=entry.currency, ) prices.append(newentry) if len(prices) == 0: return None assert len(prices) == 1 return prices[0] class AIngredients: def __init__(self, ctx: Context) -> None: self.ctx = ctx self.ingredients: List[Ingredient] = [] def load(self, lst: List[Any]) -> None: pass @abstractmethod def get(self, name: str) -> Optional[Ingredient]: ... class FakeIngredients(AIngredients): def get(self, name: str) -> Optional[Ingredient]: for ing in self.ingredients: if ing.name == name: return ing ing = Ingredient(self.ctx, name, None, [], [], []) self.ingredients.append(ing) return ing class Ingredients(AIngredients): def load(self, lst: List[Any]) -> None: for ingdct in lst: ing = Ingredient.from_dict(self.ctx, ingdct) self.ingredients.append(ing) def get(self, name: str) -> Optional[Ingredient]: for ing in self.ingredients: if ing.name == name or name in ing.aliases: return ing return None class PriceDB(Element): def __init__( self, ctx: Context, price: float, amount: float, unit: Unit, currency: Optional[str], ) -> None: super().__init__(ctx) self.price = price self.amount = amount self.unit = unit self.currency = currency @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: price = dct["price"] amount = dct["amount"] unitstr = dct["unit"] unitx = ctx.units.get(unitstr) # uses the default unit if unit is not known (this won't be # visible to the user since rendering will stop after # everything is evaluated) unit = ctx.default_unit if unitx is None: ctx.issues.error(ISSUE_UNKNOWN_UNIT, f"unknown unit {unitstr}") else: unit = unitx currency = ctx.settings.default_currency if "currency" in dct: currency = dct["currency"] return cls(ctx=ctx, price=price, amount=amount, unit=unit, currency=currency) class IngredientInstance(Element): def __init__( self, ctx: Context, name: str, amount: float, unit: Unit, alternatives: List["IngredientInstance"], note: str, price: Optional[PriceDB], ) -> None: super().__init__(ctx) self.name = name self.amount = amount self.unit = unit self.alternatives = alternatives self.note = note self.price = price @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: name = dct["name"] ingredient = ctx.ingredients.get(name) if ingredient is None: ctx.issues.error( ISSUE_UNKNOWN_INGREDIENT, f"unknown ingredient {dct['name']}" ) amount = 1.0 if "amount" in dct: amount = dct["amount"] unit = ctx.default_unit if "unit" in dct: unitstr = dct["unit"] unitx = ctx.units.get(unitstr) if unitx is None: ctx.issues.error(ISSUE_UNKNOWN_UNIT, "unknown unit {unitstr}") else: unit = unitx note = "" if "note" in dct: note = dct["note"] alternatives = [] if "or" in dct: for ingdct in dct["or"]: ing = IngredientInstance.from_dict(ctx, ingdct) alternatives.append(ing) price = None if ingredient is not None: price = ingredient.getprice(amount, unit) return cls( ctx=ctx, name=name, amount=amount, unit=unit, alternatives=alternatives, note=note, price=price, ) class Recipe(Element): def __init__( self, ctx: Context, title: str, ingredients: List[IngredientInstance], subrecipes: List["Recipe"], price: Optional[PriceDB], steps: List[str], ) -> None: super().__init__(ctx) self.srcpath = "" self.outpath = "" self.title = title self.ingredients = ingredients self.subrecipes = subrecipes self.price = price self.steps = steps @classmethod def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self: ingredients: List[IngredientInstance] = [] if "ingredients" in dct: for ingdct in dct["ingredients"]: ingredient = IngredientInstance.from_dict(ctx, ingdct) ingredients.append(ingredient) subrecipes: List[Recipe] = [] if "subrecipes" in dct: for partdct in dct["subrecipes"]: rp = Recipe.from_dict(ctx, partdct) subrecipes.append(rp) price: Optional[PriceDB] = None pricex: float = 0 ingswithprice = 0 ingswithoutprice = 0 currency = None for ing in ingredients: if ing.price is None: ingswithoutprice += 1 continue ingswithprice += 1 pricex += ing.price.price cur_currency = ing.price.currency if currency is None: currency = cur_currency elif currency != cur_currency: # we don't know how to convert currencies yet currency = None break if currency is None or ingswithoutprice != 0 or len(ingredients) == 0: price = None else: price = PriceDB( ctx=ctx, price=pricex, amount=1, unit=ctx.default_unit, currency=currency, ) steps = [] if "steps" in dct: steps = dct["steps"] return cls( ctx=ctx, title=dct["title"], ingredients=ingredients, subrecipes=subrecipes, price=price, steps=steps, ) class Builder: def __init__(self) -> None: self.jinjaenv = jinja2.Environment( loader=jinja2.FileSystemLoader("templates"), autoescape=jinja2.select_autoescape(), ) def numprint(input: int) -> str: out = str(input) if out.endswith(".0"): return out.split(".", maxsplit=1)[0] return out def amountprint(input: int) -> str: out = numprint(input) if out == "0.5": return "1/2" if out == "0.25": return "1/4" if out == "0.75": return "3/4" return out self.jinjaenv.filters["numprint"] = numprint self.jinjaenv.filters["amountprint"] = amountprint self.ctx = Context() # list of output files that will be built self.outfiles: Set[str] = set() def load_file(self, file: str) -> Any: print(f"loading {file}") with open(file, encoding="utf-8") as f: txt = f.read() if file.endswith(".json"): return json.loads(txt) return yaml.safe_load(txt) def rendertemplate( self, templatepath: str, format: str, file: str, dir: str, args: Any ) -> None: template = self.jinjaenv.get_template(templatepath) print(f"rendering {file}") outstr = template.render(args) os.makedirs(f"{dir}/out/{format}", exist_ok=True) with open(f"{dir}/out/{format}/{file}", "w", encoding="utf-8") as f: f.write(outstr) self.outfiles.add(file) def load(self, dir: str) -> int: if os.path.isfile(dir + "/settings.yaml"): settingsdct = self.load_file(dir + "/settings.yaml") settingsschema = self.load_file("schemas/settings.json") self.ctx.load_settings(settingsdct, settingsschema) retcode = self.ctx.issues.check() if retcode != 0: return 1 if os.path.isfile(dir + "/units.yaml"): unitsschema = self.load_file("schemas/units.json") unitsdct = self.load_file(dir + "/units.yaml") self.ctx.load_units(unitsdct, unitsschema) retcode = self.ctx.issues.check() if retcode != 0: return 1 if os.path.isfile(dir + "/ingredients.yaml"): ingredientsdct = self.load_file(dir + "/ingredients.yaml") ingredientsschema = self.load_file("schemas/ingredients.json") self.ctx.load_ingredients(ingredientsdct, ingredientsschema) retcode = self.ctx.issues.check() if retcode != 0: return 1 return 0 def run(self, dir: str) -> int: files = [] for _, _, filesx in os.walk(dir + "/recipes"): files = filesx files.sort() recipes: List[Recipe] = [] recipeschema = self.load_file("schemas/recipe.json") for file in files: if not file.endswith(".yaml"): print(f"unknown extension of {file}") continue recipedct = self.load_file(dir + "/recipes/" + file) jsonschema.validate(instance=recipedct, schema=recipeschema) recipe = Recipe.from_dict(self.ctx, recipedct) recipe.srcpath = file recipe.outpath = file[:-5] + ".html" if self.ctx.issues.check() != 0: continue recipes.append(recipe) retcode = self.ctx.issues.check() if retcode != 0: return 1 self.rendertemplate( templatepath="index.html", format="html", file="index.html", dir=dir, args={"recipes": recipes}, ) for recipe in recipes: self.rendertemplate( templatepath="recipe.html", format="html", file=recipe.outpath, dir=dir, args={"recipe": recipe}, ) return 0 def finish(self, dir: str) -> int: files = set() for _, _, filesx in os.walk(f"{dir}/out/html"): files = set(filesx) # files we did not generate, probably left by a previous run, but not valid anymore extra_files = files - self.outfiles for file in extra_files: print(f"removing obsolete {file}") os.remove(f"{dir}/out/html/{file}") return 0 def build(self, path: str) -> int: fcs = [self.load, self.run, self.finish] for func in fcs: try: ret = func(path) if ret != 0: return ret except jsonschema.exceptions.ValidationError as e: print("ERROR:", e) return 1 return 0 def help() -> None: print(f"usage: {sys.argv[0]} build DIR - build pages in DIR/out") print(f" {sys.argv[0]} -h - show help") def main() -> None: if len(sys.argv) == 2 and sys.argv[1] == "-h": help() sys.exit(0) elif len(sys.argv) == 3 and sys.argv[1] == "build": ret = Builder().build(sys.argv[2]) sys.exit(ret) else: help() sys.exit(1) if __name__ == "__main__": main()