restructure
This commit is contained in:
parent
9e44070fe7
commit
81fdafb907
13 changed files with 376 additions and 366 deletions
comfyrecipes
496
comfyrecipes/parsing.py
Normal file
496
comfyrecipes/parsing.py
Normal file
|
@ -0,0 +1,496 @@
|
|||
from abc import abstractmethod
|
||||
import collections
|
||||
from typing import Any, Dict, List, Optional, Self
|
||||
|
||||
import comfyrecipes.settings as settings
|
||||
import comfyrecipes.issues as issues
|
||||
|
||||
import jsonschema
|
||||
|
||||
class Context:
|
||||
def __init__(self) -> None:
|
||||
self.settings = settings.Settings()
|
||||
self.units: AUnits = FakeUnits(self)
|
||||
self.default_unit = Unit(self, "piece", [], [])
|
||||
self.ingredients: AIngredients = FakeIngredients(self)
|
||||
self.issues = issues.Issues()
|
||||
|
||||
def load_units(
|
||||
self, unitsdct: List[Dict[str, Any]], unitsschema: Dict[str, Any]
|
||||
) -> None:
|
||||
self.units = Units(self)
|
||||
self.units.units.append(self.default_unit)
|
||||
jsonschema.validate(instance=unitsdct, schema=unitsschema)
|
||||
self.units.load(unitsdct)
|
||||
self.units.validate()
|
||||
|
||||
def load_ingredients(
|
||||
self, ingredientsdct: List[Dict[str, Any]], ingredientsschema: Dict[str, Any]
|
||||
) -> None:
|
||||
self.ingredients = Ingredients(self)
|
||||
jsonschema.validate(instance=ingredientsdct, schema=ingredientsschema)
|
||||
self.ingredients.load(ingredientsdct)
|
||||
|
||||
def load_settings(
|
||||
self, settingsdct: Dict[str, Any], settingsschema: Dict[str, Any]
|
||||
) -> None:
|
||||
jsonschema.validate(instance=settingsdct, schema=settingsschema)
|
||||
self.settings.load(settingsdct)
|
||||
|
||||
class Element:
|
||||
def __init__(self, ctx: Context) -> None:
|
||||
self.ctx = ctx
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
|
||||
return cls(ctx)
|
||||
|
||||
class Unit(Element):
|
||||
def __init__(
|
||||
self,
|
||||
ctx: Context,
|
||||
name: str,
|
||||
conversions: List["Conversion"],
|
||||
aliases: List[str],
|
||||
dct: Optional[Dict[str, Any]] = None,
|
||||
) -> None:
|
||||
super().__init__(ctx)
|
||||
self.name = name
|
||||
self.conversions = conversions
|
||||
self.aliases = aliases
|
||||
self.dct = dct
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
|
||||
# conversions are an empty list which is populated in finish_load
|
||||
# which should be ran when all units are registered
|
||||
aliases: List[str] = []
|
||||
if "aliases" in dct:
|
||||
aliases = dct["aliases"]
|
||||
|
||||
return cls(ctx, name=dct["name"], conversions=[], aliases=aliases, dct=dct)
|
||||
|
||||
def finish_load(self) -> None:
|
||||
if self.dct is None:
|
||||
return
|
||||
if "conversions" not in self.dct:
|
||||
return
|
||||
conversions = []
|
||||
for conv in self.dct["conversions"]:
|
||||
if "from" in conv:
|
||||
raise RuntimeError(
|
||||
"conversions in units.yaml cannot have a from field, it is automatically assigned from the unit name"
|
||||
)
|
||||
conv["from"] = self.dct["name"]
|
||||
conv = Conversion.from_dict(self.ctx, conv)
|
||||
conversions.append(conv)
|
||||
|
||||
self.conversions = conversions
|
||||
|
||||
|
||||
|
||||
class AUnits:
|
||||
def __init__(self, ctx: Context) -> None:
|
||||
self.ctx = ctx
|
||||
self.units: List[Unit] = []
|
||||
|
||||
def load(self, lst: List[Any]) -> None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get(self, name: str) -> Optional[Unit]:
|
||||
...
|
||||
|
||||
def validate(self) -> None:
|
||||
pass
|
||||
|
||||
|
||||
class FakeUnits(AUnits):
|
||||
def get(self, name: str) -> Optional[Unit]:
|
||||
for unit in self.units:
|
||||
if unit.name == name:
|
||||
return unit
|
||||
unit = Unit(self.ctx, name, [], [])
|
||||
self.units.append(unit)
|
||||
return unit
|
||||
|
||||
|
||||
class Units(AUnits):
|
||||
def load(self, lst: List[Any]) -> None:
|
||||
for unitdct in lst:
|
||||
unit = Unit.from_dict(self.ctx, unitdct)
|
||||
self.units.append(unit)
|
||||
for unit in self.units:
|
||||
unit.finish_load()
|
||||
|
||||
def get(self, name: str) -> Optional[Unit]:
|
||||
for unit in self.units:
|
||||
if unit.name == name or name in unit.aliases:
|
||||
return unit
|
||||
return None
|
||||
|
||||
def validate(self) -> None:
|
||||
unitnames = []
|
||||
for unit in self.units:
|
||||
unitnames.append(unit.name)
|
||||
for unitname, num in collections.Counter(unitnames).items():
|
||||
if num > 1:
|
||||
self.ctx.issues.error(
|
||||
issues.ISSUE_DUPLICATE_UNITS,
|
||||
f"units.yaml: {unitname} should only have one entry, found {num}",
|
||||
)
|
||||
|
||||
class IngredientInstance(Element):
|
||||
def __init__(
|
||||
self,
|
||||
ctx: Context,
|
||||
name: str,
|
||||
amount: float,
|
||||
unit: Unit,
|
||||
alternatives: List["IngredientInstance"],
|
||||
note: str,
|
||||
price: Optional["PriceDB"],
|
||||
) -> None:
|
||||
super().__init__(ctx)
|
||||
self.name = name
|
||||
self.amount = amount
|
||||
self.unit = unit
|
||||
self.alternatives = alternatives
|
||||
self.note = note
|
||||
self.price = price
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
|
||||
name = dct["name"]
|
||||
|
||||
ingredient = ctx.ingredients.get(name)
|
||||
if ingredient is None:
|
||||
ctx.issues.error(
|
||||
issues.ISSUE_UNKNOWN_INGREDIENT, f"unknown ingredient {dct['name']}"
|
||||
)
|
||||
|
||||
amount = 1.0
|
||||
if "amount" in dct:
|
||||
amount = dct["amount"]
|
||||
|
||||
unit = ctx.default_unit
|
||||
if "unit" in dct:
|
||||
unitstr = dct["unit"]
|
||||
unitx = ctx.units.get(unitstr)
|
||||
if unitx is None:
|
||||
ctx.issues.error(issues.ISSUE_UNKNOWN_UNIT, "unknown unit {unitstr}")
|
||||
else:
|
||||
unit = unitx
|
||||
|
||||
note = ""
|
||||
if "note" in dct:
|
||||
note = dct["note"]
|
||||
|
||||
alternatives = []
|
||||
if "or" in dct:
|
||||
for ingdct in dct["or"]:
|
||||
ing = IngredientInstance.from_dict(ctx, ingdct)
|
||||
alternatives.append(ing)
|
||||
|
||||
price = None
|
||||
if ingredient is not None:
|
||||
price = ingredient.getprice(amount, unit)
|
||||
return cls(
|
||||
ctx=ctx,
|
||||
name=name,
|
||||
amount=amount,
|
||||
unit=unit,
|
||||
alternatives=alternatives,
|
||||
note=note,
|
||||
price=price,
|
||||
)
|
||||
|
||||
|
||||
class Recipe(Element):
|
||||
def __init__(
|
||||
self,
|
||||
ctx: Context,
|
||||
title: str,
|
||||
ingredients: List[IngredientInstance],
|
||||
subrecipes: List["Recipe"],
|
||||
price: Optional["PriceDB"],
|
||||
steps: List[str],
|
||||
) -> None:
|
||||
super().__init__(ctx)
|
||||
self.srcpath = ""
|
||||
self.outpath = ""
|
||||
self.title = title
|
||||
self.ingredients = ingredients
|
||||
self.subrecipes = subrecipes
|
||||
self.price = price
|
||||
self.steps = steps
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
|
||||
ingredients: List[IngredientInstance] = []
|
||||
if "ingredients" in dct:
|
||||
for ingdct in dct["ingredients"]:
|
||||
ingredient = IngredientInstance.from_dict(ctx, ingdct)
|
||||
ingredients.append(ingredient)
|
||||
|
||||
subrecipes: List[Recipe] = []
|
||||
if "subrecipes" in dct:
|
||||
for partdct in dct["subrecipes"]:
|
||||
rp = Recipe.from_dict(ctx, partdct)
|
||||
subrecipes.append(rp)
|
||||
|
||||
price: Optional[PriceDB] = None
|
||||
pricex: float = 0
|
||||
ingswithprice = 0
|
||||
ingswithoutprice = 0
|
||||
currency = None
|
||||
for ing in ingredients:
|
||||
if ing.price is None:
|
||||
ingswithoutprice += 1
|
||||
continue
|
||||
ingswithprice += 1
|
||||
pricex += ing.price.price
|
||||
cur_currency = ing.price.currency
|
||||
if currency is None:
|
||||
currency = cur_currency
|
||||
elif currency != cur_currency:
|
||||
# we don't know how to convert currencies yet
|
||||
currency = None
|
||||
break
|
||||
if currency is None or ingswithoutprice != 0 or len(ingredients) == 0:
|
||||
price = None
|
||||
else:
|
||||
price = PriceDB(
|
||||
ctx=ctx,
|
||||
price=pricex,
|
||||
amount=1,
|
||||
unit=ctx.default_unit,
|
||||
currency=currency,
|
||||
)
|
||||
|
||||
steps = []
|
||||
if "steps" in dct:
|
||||
steps = dct["steps"]
|
||||
return cls(
|
||||
ctx=ctx,
|
||||
title=dct["title"],
|
||||
ingredients=ingredients,
|
||||
subrecipes=subrecipes,
|
||||
price=price,
|
||||
steps=steps,
|
||||
)
|
||||
|
||||
class Conversion(Element):
|
||||
def __init__(
|
||||
self, ctx: Context, fromunit: Unit, tounit: Unit, ratio: float
|
||||
) -> None:
|
||||
super().__init__(ctx)
|
||||
self.fromunit = fromunit
|
||||
self.tounit = tounit
|
||||
self.ratio = ratio
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
|
||||
fromunit = ctx.units.get(dct["from"])
|
||||
if fromunit is None:
|
||||
raise RuntimeError(f"unit {dct['from']} doesn't exist")
|
||||
|
||||
tounit = ctx.units.get(dct["to"])
|
||||
if tounit is None:
|
||||
raise RuntimeError(f"unit {dct['to']} doesn't exist")
|
||||
return cls(ctx, fromunit, tounit, dct["ratio"])
|
||||
|
||||
class Ingredient(Element):
|
||||
def __init__(
|
||||
self,
|
||||
ctx: Context,
|
||||
name: str,
|
||||
wdid: Optional[int],
|
||||
prices: List["PriceDB"],
|
||||
conversions: List[Conversion],
|
||||
aliases: List[str],
|
||||
) -> None:
|
||||
super().__init__(ctx)
|
||||
self.name = name
|
||||
self.wdid = wdid
|
||||
self.prices = prices
|
||||
self.conversions = conversions
|
||||
self.aliases = aliases
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
|
||||
wdid = None
|
||||
if "wdid" in dct:
|
||||
wdid = dct["wdid"]
|
||||
|
||||
prices: List[PriceDB] = []
|
||||
if "prices" in dct:
|
||||
for pricedct in dct["prices"]:
|
||||
prices.append(PriceDB.from_dict(ctx, pricedct))
|
||||
|
||||
conversions: List[Conversion] = []
|
||||
if "conversions" in dct:
|
||||
for convdct in dct["conversions"]:
|
||||
conversion = Conversion.from_dict(ctx, convdct)
|
||||
conversions.append(conversion)
|
||||
|
||||
aliases: List[str] = []
|
||||
if "aliases" in dct:
|
||||
aliases = dct["aliases"]
|
||||
|
||||
return cls(
|
||||
ctx,
|
||||
name=dct["name"],
|
||||
wdid=wdid,
|
||||
prices=prices,
|
||||
conversions=conversions,
|
||||
aliases=aliases,
|
||||
)
|
||||
|
||||
def dfs(
|
||||
self,
|
||||
conversions: Dict[str, Dict[str, float]],
|
||||
startname: str,
|
||||
endname: str,
|
||||
visited: Optional[List[str]] = None,
|
||||
) -> Optional[List[str]]:
|
||||
if visited is None:
|
||||
visited = []
|
||||
if startname == endname:
|
||||
return visited + [startname]
|
||||
|
||||
for nextunit in conversions[startname].keys():
|
||||
if nextunit in visited:
|
||||
continue
|
||||
result = self.dfs(conversions, nextunit, endname, visited + [startname])
|
||||
if result is not None:
|
||||
return result
|
||||
return None
|
||||
|
||||
def convert(self, amount: float, unitfrom: Unit, unitto: Unit) -> Optional[float]:
|
||||
conversions: Dict[str, Dict[str, float]] = collections.defaultdict(dict)
|
||||
# construct node tree
|
||||
convs = self.conversions
|
||||
for unit in self.ctx.units.units:
|
||||
convs += unit.conversions
|
||||
for conv in convs:
|
||||
fromname = conv.fromunit.name
|
||||
toname = conv.tounit.name
|
||||
conversions[fromname][toname] = conv.ratio
|
||||
conversions[toname][fromname] = 1 / conv.ratio
|
||||
|
||||
# find path between conversions
|
||||
path = self.dfs(conversions, unitfrom.name, unitto.name)
|
||||
if path is None:
|
||||
self.ctx.issues.warn(
|
||||
issues.ISSUE_KNOWN_PRICE_UNKNOWN_CONVERSION,
|
||||
f"{self.name} has a known price, but conversion {unitfrom.name} -> {unitto.name} not known",
|
||||
)
|
||||
return None
|
||||
assert len(path) != 0
|
||||
oldelem = path[0]
|
||||
for elem in path[1:]:
|
||||
amount *= conversions[oldelem][elem]
|
||||
oldelem = elem
|
||||
return amount
|
||||
|
||||
def getprice(self, amount: float, unit: Unit) -> Optional["PriceDB"]:
|
||||
prices: List[PriceDB] = []
|
||||
pricedbs: List[PriceDB] = self.prices
|
||||
for entry in pricedbs:
|
||||
assert isinstance(entry, PriceDB)
|
||||
entryamount: float = entry.amount
|
||||
entryprice: float = entry.price
|
||||
entryunit: Unit = entry.unit
|
||||
price = 0.0
|
||||
if entryunit == unit:
|
||||
price = (amount / entryamount) * entryprice
|
||||
else:
|
||||
pricex = self.convert(amount, unit, entryunit)
|
||||
if pricex is not None:
|
||||
price = (pricex / entryamount) * entryprice
|
||||
newentry = PriceDB(
|
||||
self.ctx,
|
||||
price=price,
|
||||
amount=amount,
|
||||
unit=unit,
|
||||
currency=entry.currency,
|
||||
)
|
||||
prices.append(newentry)
|
||||
if len(prices) == 0:
|
||||
return None
|
||||
assert len(prices) == 1
|
||||
return prices[0]
|
||||
|
||||
|
||||
class AIngredients:
|
||||
def __init__(self, ctx: Context) -> None:
|
||||
self.ctx = ctx
|
||||
self.ingredients: List[Ingredient] = []
|
||||
|
||||
def load(self, lst: List[Any]) -> None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get(self, name: str) -> Optional[Ingredient]:
|
||||
...
|
||||
|
||||
|
||||
class FakeIngredients(AIngredients):
|
||||
def get(self, name: str) -> Optional[Ingredient]:
|
||||
for ing in self.ingredients:
|
||||
if ing.name == name:
|
||||
return ing
|
||||
ing = Ingredient(self.ctx, name, None, [], [], [])
|
||||
self.ingredients.append(ing)
|
||||
return ing
|
||||
|
||||
|
||||
class Ingredients(AIngredients):
|
||||
def load(self, lst: List[Any]) -> None:
|
||||
for ingdct in lst:
|
||||
ing = Ingredient.from_dict(self.ctx, ingdct)
|
||||
self.ingredients.append(ing)
|
||||
|
||||
def get(self, name: str) -> Optional[Ingredient]:
|
||||
for ing in self.ingredients:
|
||||
if ing.name == name or name in ing.aliases:
|
||||
return ing
|
||||
return None
|
||||
|
||||
|
||||
class PriceDB(Element):
|
||||
def __init__(
|
||||
self,
|
||||
ctx: Context,
|
||||
price: float,
|
||||
amount: float,
|
||||
unit: Unit,
|
||||
currency: Optional[str],
|
||||
) -> None:
|
||||
super().__init__(ctx)
|
||||
self.price = price
|
||||
self.amount = amount
|
||||
self.unit = unit
|
||||
self.currency = currency
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, ctx: Context, dct: Dict[str, Any]) -> Self:
|
||||
price = dct["price"]
|
||||
amount = dct["amount"]
|
||||
|
||||
unitstr = dct["unit"]
|
||||
unitx = ctx.units.get(unitstr)
|
||||
# uses the default unit if unit is not known (this won't be
|
||||
# visible to the user since rendering will stop after
|
||||
# everything is evaluated)
|
||||
unit = ctx.default_unit
|
||||
if unitx is None:
|
||||
ctx.issues.error(issues.ISSUE_UNKNOWN_UNIT, f"unknown unit {unitstr}")
|
||||
else:
|
||||
unit = unitx
|
||||
|
||||
currency = ctx.settings.default_currency
|
||||
if "currency" in dct:
|
||||
currency = dct["currency"]
|
||||
return cls(ctx=ctx, price=price, amount=amount, unit=unit, currency=currency)
|
Loading…
Add table
Add a link
Reference in a new issue