506 lines
16 KiB
Python
506 lines
16 KiB
Python
from abc import abstractmethod
|
|
from collections import defaultdict
|
|
import collections
|
|
import sys
|
|
from typing import Dict, List, Any, Optional, Set
|
|
import os
|
|
import json
|
|
|
|
import yaml
|
|
import jinja2
|
|
import jsonschema
|
|
import jsonschema.exceptions
|
|
|
|
ISSUE_UNKNOWN_UNIT = "unknown-unit"
|
|
ISSUE_UNKNOWN_INGREDIENT = "unknown-ingredient"
|
|
ISSUE_DUPLICATE_UNITS = "duplicate-units"
|
|
ISSUE_KNOWN_PRICE_UNKNOWN_CONVERSION = "known-price-unknown-conversion"
|
|
|
|
|
|
class Issue:
|
|
def __init__(self, id: str, msg: str) -> None:
|
|
self.id = id
|
|
self.msg = msg
|
|
|
|
|
|
class Issues:
|
|
def __init__(self) -> None:
|
|
self.errors: List[Issue] = []
|
|
self.warnings: List[Issue] = []
|
|
|
|
def error(self, id: str, msg: str) -> None:
|
|
self.errors.append(Issue(id, msg))
|
|
|
|
def warn(self, id: str, msg: str) -> None:
|
|
self.warnings.append(Issue(id, msg))
|
|
|
|
def check(self) -> int:
|
|
retcode = len(self.errors) != 0
|
|
|
|
for msg in self.errors:
|
|
print(f"ERROR {msg.id}: {msg.msg}")
|
|
for msg in self.warnings:
|
|
print(f"WARNING {msg.id}: {msg.msg}")
|
|
|
|
self.errors.clear()
|
|
self.warnings.clear()
|
|
return retcode
|
|
|
|
|
|
class Context:
|
|
def __init__(self) -> None:
|
|
self.units = Units(self)
|
|
self.default_unit = Unit(self, {"name": "piece"})
|
|
self.units.units.append(self.default_unit)
|
|
self.ingredients = Ingredients(self)
|
|
self.issues = Issues()
|
|
|
|
|
|
class Element:
|
|
def __init__(self, ctx: Context, dct: Dict[str, Any]) -> None:
|
|
self.ctx = ctx
|
|
self.dct = dct
|
|
self.load(dct)
|
|
for elem in self.dct.values():
|
|
if isinstance(elem, dict):
|
|
raise RuntimeError("Something wasn't processed properly")
|
|
|
|
def __contains__(self, item: Any) -> bool:
|
|
return item in self.dct
|
|
|
|
def __getitem__(self, key: str) -> Any:
|
|
return self.dct[key]
|
|
|
|
def __setitem__(self, key: str, val: Any) -> None:
|
|
self.dct[key] = val
|
|
|
|
def __repr__(self) -> str:
|
|
return self.dct.__repr__()
|
|
|
|
@abstractmethod
|
|
def load(self, dct: Dict[str, Any]) -> None:
|
|
...
|
|
|
|
|
|
class Conversion(Element):
|
|
def load(self, dct: Dict[str, Any]) -> None:
|
|
fromunit = self.ctx.units.get(dct["from"])
|
|
if fromunit is None:
|
|
raise RuntimeError(f"unit {dct['from']} doesn't exist")
|
|
self["from"] = fromunit
|
|
|
|
tounit = self.ctx.units.get(dct["to"])
|
|
if tounit is None:
|
|
raise RuntimeError(f"unit {dct['to']} doesn't exist")
|
|
self["to"] = tounit
|
|
|
|
|
|
class Unit(Element):
|
|
def load(self, dct: Dict[str, Any]) -> None:
|
|
oldunits = self.ctx.units.units[:]
|
|
self.ctx.units.units.append(self)
|
|
|
|
aliases: List[str] = []
|
|
if "aliases" in dct:
|
|
for alias in dct["aliases"]:
|
|
aliases.append(alias)
|
|
self["aliases"] = aliases
|
|
self.ctx.units.units = oldunits
|
|
|
|
def finish_load(self) -> None:
|
|
conversions: List[Conversion] = []
|
|
if "conversions" in self.dct:
|
|
for convdct in self.dct["conversions"]:
|
|
if "from" in self.dct["conversions"]:
|
|
raise RuntimeError(
|
|
"conversions in units.yaml cannot have a from field, it is automatically assigned from the unit name"
|
|
)
|
|
convdct["from"] = self["name"]
|
|
conversion = Conversion(self.ctx, convdct)
|
|
conversions.append(conversion)
|
|
self["conversions"] = conversions
|
|
|
|
|
|
class Units:
|
|
def __init__(self, ctx: Context) -> None:
|
|
self.ctx = ctx
|
|
self.units: List[Unit] = []
|
|
|
|
def load(self, lst: List[Any]) -> None:
|
|
for unitdct in lst:
|
|
unit = Unit(self.ctx, unitdct)
|
|
self.units.append(unit)
|
|
for unit in self.units:
|
|
unit.finish_load()
|
|
|
|
def get(self, name: str) -> Optional[Unit]:
|
|
for unit in self.units:
|
|
if unit["name"] == name or "aliases" in unit and name in unit["aliases"]:
|
|
return unit
|
|
return None
|
|
|
|
def validate(self) -> None:
|
|
unitnames = []
|
|
for unit in self.units:
|
|
unitnames.append(unit["name"])
|
|
for unitname, num in collections.Counter(unitnames).items():
|
|
if num > 1:
|
|
self.ctx.issues.error(
|
|
ISSUE_DUPLICATE_UNITS,
|
|
f"units.yaml: {unitname} should only have one entry, found {num}",
|
|
)
|
|
|
|
|
|
class Ingredient(Element):
|
|
def load(self, dct: Dict[str, Any]) -> None:
|
|
if "prices" in dct:
|
|
pricedb = PriceDBs(self.ctx)
|
|
pricedb.load(dct["prices"])
|
|
self["prices"] = pricedb
|
|
|
|
conversions = []
|
|
if "conversions" in dct:
|
|
for convdct in dct["conversions"]:
|
|
conversion = Conversion(self.ctx, convdct)
|
|
conversions.append(conversion)
|
|
self["conversions"] = conversions
|
|
|
|
def dfs(
|
|
self,
|
|
conversions: Dict[str, Dict[str, float]],
|
|
startname: str,
|
|
endname: str,
|
|
visited: Optional[List[str]] = None,
|
|
) -> Optional[List[str]]:
|
|
if visited is None:
|
|
visited = []
|
|
if startname == endname:
|
|
return visited + [startname]
|
|
|
|
for nextunit in conversions[startname].keys():
|
|
if nextunit in visited:
|
|
continue
|
|
result = self.dfs(conversions, nextunit, endname, visited + [startname])
|
|
if result is not None:
|
|
return result
|
|
return None
|
|
|
|
def convert(self, amount: float, unitfrom: Unit, unitto: Unit) -> Optional[float]:
|
|
conversions: Dict[str, Dict[str, float]] = defaultdict(dict)
|
|
# construct node tree
|
|
convs = self["conversions"]
|
|
for unit in self.ctx.units.units:
|
|
convs += unit["conversions"]
|
|
for conv in convs:
|
|
fromname = conv["from"]["name"]
|
|
toname = conv["to"]["name"]
|
|
conversions[fromname][toname] = conv["ratio"]
|
|
conversions[toname][fromname] = 1 / conv["ratio"]
|
|
|
|
# find path between conversions
|
|
path = self.dfs(conversions, unitfrom["name"], unitto["name"])
|
|
if path is None:
|
|
self.ctx.issues.warn(
|
|
ISSUE_KNOWN_PRICE_UNKNOWN_CONVERSION,
|
|
f'{self["name"]} has a known price, but conversion {unitfrom["name"]} -> {unitto["name"]} not known',
|
|
)
|
|
return None
|
|
assert len(path) != 0
|
|
oldelem = path[0]
|
|
for elem in path[1:]:
|
|
amount *= conversions[oldelem][elem]
|
|
oldelem = elem
|
|
return amount
|
|
|
|
def getprice(self, amount: float, unit: Unit) -> Optional[float]:
|
|
if "prices" not in self.dct:
|
|
return None
|
|
prices: List[float] = []
|
|
pricedbs: PriceDBs = self["prices"]
|
|
for entry in pricedbs.pricedbs:
|
|
assert isinstance(entry, PriceDB)
|
|
entryamount: float = entry["amount"]
|
|
entryprice: float = entry["price"]
|
|
entryunit: Unit = entry["unit"]
|
|
if entryunit == unit:
|
|
prices.append((amount / entryamount) * entryprice)
|
|
else:
|
|
newamount = self.convert(amount, unit, entryunit)
|
|
if newamount is not None:
|
|
prices.append((newamount / entryamount) * entryprice)
|
|
if len(prices) == 0:
|
|
return None
|
|
assert len(prices) == 1
|
|
return prices[0]
|
|
|
|
|
|
class Ingredients:
|
|
def __init__(self, ctx: Context) -> None:
|
|
self.ctx = ctx
|
|
self.ingredients: List[Ingredient] = []
|
|
|
|
def load(self, lst: List[Any]) -> None:
|
|
for ingdct in lst:
|
|
ing = Ingredient(self.ctx, ingdct)
|
|
self.ingredients.append(ing)
|
|
|
|
def get(self, name: str) -> Optional[Ingredient]:
|
|
for ing in self.ingredients:
|
|
if ing["name"] == name or "aliases" in ing and name in ing["aliases"]:
|
|
return ing
|
|
return None
|
|
|
|
|
|
class PriceDBs:
|
|
def __init__(self, ctx: Context) -> None:
|
|
self.ctx = ctx
|
|
self.pricedbs: List[PriceDB] = []
|
|
|
|
def load(self, lst: List[Any]) -> None:
|
|
for elem in lst:
|
|
pricedb = PriceDB(self.ctx, elem)
|
|
self.pricedbs.append(pricedb)
|
|
|
|
def __repr__(self) -> str:
|
|
return self.pricedbs.__repr__()
|
|
|
|
|
|
class PriceDB(Element):
|
|
def load(self, dct: Dict[str, Any]) -> None:
|
|
if "amount" not in dct:
|
|
self["amount"] = 1.0
|
|
|
|
if "unit" in dct:
|
|
unitstr = dct["unit"]
|
|
self["unit"] = self.ctx.units.get(unitstr)
|
|
if self["unit"] is None:
|
|
self.ctx.issues.error(ISSUE_UNKNOWN_UNIT, f"unknown unit {unitstr}")
|
|
else:
|
|
self["unit"] = self.ctx.default_unit
|
|
|
|
|
|
class IngredientInstance(Element):
|
|
def load(self, dct: Dict[str, Any]) -> None:
|
|
ingredient = self.ctx.ingredients.get(dct["name"])
|
|
if ingredient is None:
|
|
self.ctx.issues.error(
|
|
ISSUE_UNKNOWN_INGREDIENT, f"unknown ingredient {dct['name']}"
|
|
)
|
|
self["ingredient"] = ingredient
|
|
|
|
if "amount" not in dct:
|
|
self["amount"] = 1.0
|
|
|
|
if "unit" in dct:
|
|
unitstr = dct["unit"]
|
|
self["unit"] = self.ctx.units.get(unitstr)
|
|
if self["unit"] is None:
|
|
self.ctx.issues.error(ISSUE_UNKNOWN_UNIT, "unknown unit {unitstr}")
|
|
else:
|
|
self["unit"] = self.ctx.default_unit
|
|
|
|
if "note" not in dct:
|
|
self["note"] = ""
|
|
|
|
alternatives = []
|
|
if "or" in dct:
|
|
for ingdct in dct["or"]:
|
|
ing = IngredientInstance(self.ctx, ingdct)
|
|
alternatives.append(ing)
|
|
self["alternatives"] = alternatives
|
|
|
|
if ingredient is not None:
|
|
self["price"] = ingredient.getprice(self["amount"], self["unit"])
|
|
|
|
|
|
class Recipe(Element):
|
|
def __init__(self, ctx: Context, dct: Dict[str, Any]) -> None:
|
|
super().__init__(ctx, dct)
|
|
self.srcpath = ""
|
|
self.outpath = ""
|
|
|
|
def load(self, dct: Dict[str, Any]) -> None:
|
|
ingredients: List[IngredientInstance] = []
|
|
if "ingredients" in dct:
|
|
for ing in dct["ingredients"]:
|
|
ingredient = IngredientInstance(self.ctx, ing)
|
|
ingredients.append(ingredient)
|
|
self["ingredients"] = ingredients
|
|
|
|
subrecipes: List[Recipe] = []
|
|
if "subrecipes" in dct:
|
|
for partdct in dct["subrecipes"]:
|
|
rp = Recipe(self.ctx, partdct)
|
|
subrecipes.append(rp)
|
|
self["subrecipes"] = subrecipes
|
|
|
|
price: Optional[int] = 0
|
|
ingswithprice = 0
|
|
ingswithoutprice = 0
|
|
for ing in ingredients:
|
|
if ing["price"] is None:
|
|
ingswithoutprice += 1
|
|
continue
|
|
ingswithprice += 1
|
|
price += ing["price"]
|
|
if ingswithoutprice != 0 or len(ingredients) == 0:
|
|
price = None
|
|
self["price"] = price
|
|
|
|
|
|
class Builder:
|
|
def __init__(self) -> None:
|
|
self.jinjaenv = jinja2.Environment(
|
|
loader=jinja2.FileSystemLoader("templates"),
|
|
autoescape=jinja2.select_autoescape(),
|
|
)
|
|
|
|
def numprint(input: int) -> str:
|
|
out = str(input)
|
|
if out.endswith(".0"):
|
|
return out.split(".", maxsplit=1)[0]
|
|
return out
|
|
|
|
def amountprint(input: int) -> str:
|
|
out = numprint(input)
|
|
if out == "0.5":
|
|
return "1/2"
|
|
if out == "0.25":
|
|
return "1/4"
|
|
if out == "0.75":
|
|
return "3/4"
|
|
return out
|
|
|
|
self.jinjaenv.filters["numprint"] = numprint
|
|
self.jinjaenv.filters["amountprint"] = amountprint
|
|
self.ctx = Context()
|
|
# list of output files that will be built
|
|
self.outfiles: Set[str] = set()
|
|
|
|
def load_file(self, file: str) -> Any:
|
|
print(f"loading {file}")
|
|
with open(file, encoding="utf-8") as f:
|
|
txt = f.read()
|
|
if file.endswith(".json"):
|
|
return json.loads(txt)
|
|
return yaml.safe_load(txt)
|
|
|
|
def rendertemplate(
|
|
self, templatepath: str, format: str, file: str, dir: str, args: Any
|
|
) -> None:
|
|
template = self.jinjaenv.get_template(templatepath)
|
|
print(f"rendering {file}")
|
|
outstr = template.render(args)
|
|
|
|
os.makedirs(f"{dir}/out/{format}", exist_ok=True)
|
|
|
|
with open(f"{dir}/out/{format}/{file}", "w", encoding="utf-8") as f:
|
|
f.write(outstr)
|
|
self.outfiles.add(file)
|
|
|
|
def load(self, dir: str) -> int:
|
|
unitsdct = self.load_file(dir + "/units.yaml")
|
|
unitsschema = self.load_file("schemas/units.json")
|
|
jsonschema.validate(instance=unitsdct, schema=unitsschema)
|
|
self.ctx.units.load(unitsdct)
|
|
retcode = self.ctx.issues.check()
|
|
if retcode != 0:
|
|
return 1
|
|
self.ctx.units.validate()
|
|
|
|
ingredientsdct = self.load_file(dir + "/ingredients.yaml")
|
|
ingredientsschema = self.load_file("schemas/ingredients.json")
|
|
jsonschema.validate(instance=ingredientsdct, schema=ingredientsschema)
|
|
self.ctx.ingredients.load(ingredientsdct)
|
|
retcode = self.ctx.issues.check()
|
|
if retcode != 0:
|
|
return 1
|
|
|
|
return 0
|
|
|
|
def run(self, dir: str) -> int:
|
|
files = []
|
|
for _, _, filesx in os.walk(dir + "/recipes"):
|
|
files = filesx
|
|
files.sort()
|
|
|
|
recipes: List[Recipe] = []
|
|
recipeschema = self.load_file("schemas/recipe.json")
|
|
for file in files:
|
|
if not file.endswith(".yaml"):
|
|
print(f"unknown extension of {file}")
|
|
continue
|
|
recipedct = self.load_file(dir + "/recipes/" + file)
|
|
jsonschema.validate(instance=recipedct, schema=recipeschema)
|
|
recipe = Recipe(self.ctx, recipedct)
|
|
recipe.srcpath = file
|
|
recipe.outpath = file[:-5] + ".html"
|
|
if self.ctx.issues.check() != 0:
|
|
continue
|
|
recipes.append(recipe)
|
|
|
|
retcode = self.ctx.issues.check()
|
|
if retcode != 0:
|
|
return 1
|
|
|
|
self.rendertemplate(
|
|
templatepath="index.html",
|
|
format="html",
|
|
file="index.html",
|
|
dir=dir,
|
|
args={"recipes": recipes},
|
|
)
|
|
for recipe in recipes:
|
|
self.rendertemplate(
|
|
templatepath="recipe.html",
|
|
format="html",
|
|
file=recipe.outpath,
|
|
dir=dir,
|
|
args={"recipe": recipe},
|
|
)
|
|
return 0
|
|
|
|
def finish(self, dir: str) -> int:
|
|
files = set()
|
|
for _, _, filesx in os.walk(f"{dir}/out/html"):
|
|
files = set(filesx)
|
|
|
|
# files we did not generate, probably left by a previous run, but not valid anymore
|
|
extra_files = files - self.outfiles
|
|
for file in extra_files:
|
|
print(f"removing obsolete {file}")
|
|
os.remove(f"{dir}/out/html/{file}")
|
|
return 0
|
|
|
|
def build(self, path: str) -> int:
|
|
fcs = [self.load, self.run, self.finish]
|
|
for func in fcs:
|
|
try:
|
|
ret = func(path)
|
|
if ret != 0:
|
|
return ret
|
|
except jsonschema.exceptions.ValidationError as e:
|
|
print("ERROR:", e)
|
|
return 1
|
|
return 0
|
|
|
|
|
|
def help() -> None:
|
|
print(f"usage: {sys.argv[0]} build DIR - build pages in DIR/out")
|
|
print(f" {sys.argv[0]} -h - show help")
|
|
|
|
|
|
def main() -> None:
|
|
if len(sys.argv) == 2 and sys.argv[1] == "-h":
|
|
help()
|
|
sys.exit(0)
|
|
elif len(sys.argv) == 3 and sys.argv[1] == "build":
|
|
ret = Builder().build(sys.argv[2])
|
|
sys.exit(ret)
|
|
else:
|
|
help()
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|