Module hoshmap.frozenidict
Expand source code
import json
import re
from collections import UserDict
from functools import cached_property
from typing import Dict, TypeVar, Union
from hosh import ø
from hoshmap.let import Let
from hoshmap.serialization.customjson import CustomJSONEncoder
from hoshmap.value import LazyiVal
from hoshmap.value.dfival import explode_df
from hoshmap.value.strictival import StrictiVal, iVal
VT = TypeVar("VT")
class FrozenIdict(UserDict, Dict[str, VT]):
"""
Frozen version of Idict
Nested idicts become frozen for consistent identities.
Immutable values don't mess with their respective identifiers.
>>> "x" in FrozenIdict(x=2)
True
"""
_evaluated = None
# noinspection PyMissingConstructor
def __init__(self, /, _dictionary=None, **kwargs):
from hoshmap.idict import Idict
data: Dict[str, Union[iVal, str, dict]] = _dictionary or {}
data.update(kwargs)
if "_id" in data.keys() or "_ids" in data.keys(): # pragma: no cover
raise Exception(f"Cannot have a field named '_id'/'_ids': {data.keys()}")
self.data: Dict[str, iVal] = {}
self.hosh = self.mhosh = ø
self.ids = {}
self.mids = {}
for k, v in data.items():
if isinstance(v, iVal):
self.data[k] = v
else:
if isinstance(v, Idict):
self.data[k] = StrictiVal(v.frozen, v.hosh)
elif str(type(v)) == "<class 'pandas.core.frame.DataFrame'>":
self.data[k], self.data[k + "_"] = explode_df(v)
else:
self.data[k] = StrictiVal(v)
if k.startswith("_") and k not in ["_id", "_ids"]:
pass
# TODO: add mehosh (for metafields)?
# TODO: add mihosh (for mirrorfields)?
# TODO: specify new type of field: mirrorfield, e.g.: 'df_' is a mirror/derived from 'df'
# self.mhosh += self.data[k].hosh * k.encode()
# self.mids[k] = self.data[k].hosh.id
else:
self.hosh += self.data[k].hosh * k.encode()
# PAPER9: remember to state in the paper that hash(identifier) must be different from hash(value), for any identifier and value. E.g.: hash(X) != hash("X")
# Here the difference always happen because values are pickled, while identifiers are just encoded().
self.ids[k] = self.data[k].hosh.id # TODO: separate mids from ids?
# noinspection PyTypeChecker
self.data["_id"] = self.id = self.hosh.id
self.data["_ids"] = self.ids
# self.data["_mid"] = self.id = self.hosh.id
# self.data["_mids"] = self.ids
@staticmethod
def fromdict(dictionary, ids):
"""Build a frozenidict from values and pre-defined ids"""
data = {}
for k, v in dictionary.items():
if isinstance(v, iVal):
if k in ids and ids[k] != v.id: # pragma: no cover
raise Exception(f"Conflicting ids provided for key '{k}': ival.id={v.id}; ids[{k}]={ids[k]}")
data[k] = v
else:
data[k] = StrictiVal(v, ids[k])
return FrozenIdict(data)
@property
def evaluated(self):
if self._evaluated is None:
self._evaluated = self.evaluate()
return self
def evaluate(self):
for k, ival in self.data.items():
if k not in ["_id", "_ids"]:
ival.evaluate()
return self
@cached_property
def asdict(self):
dic = {k: v for k, v in self.entries()}
dic["_id"] = self.id
dic["_ids"] = self.ids.copy()
return dic
@cached_property
def asdicts(self):
dic = {}
for k, v in self.entries():
dic[k] = v.asdicts if isinstance(v, FrozenIdict) else v
dic["_id"] = self.id
dic["_ids"] = self.ids.copy()
return dic
@cached_property
def asdicts_hoshes_noneval(self):
from hoshmap.value.cacheableival import CacheableiVal
hoshes = set()
dic = {}
for k, ival in self.data.items():
if k not in ["_id", "_ids"]:
hoshes.add(ival.hosh)
if isinstance(ival, CacheableiVal):
dic[k] = ival
else:
v = ival.value
if isinstance(v, FrozenIdict):
dic[k], subhoshes = v.asdicts_hoshes_noneval
hoshes.update(subhoshes)
else:
dic[k] = v
hoshes.add(self.hosh)
dic["_id"] = self.id
dic["_ids"] = self.ids.copy()
return dic, hoshes
def astext(self, colored=True, key_quotes=False):
r"""Textual representation of a frozenidict object"""
dicts, hoshes = self.asdicts_hoshes_noneval
txt = json.dumps(dicts, indent=4, ensure_ascii=False, cls=CustomJSONEncoder)
# Put colors after json, to avoid escaping ansi codes.
if colored:
for h in hoshes:
txt = txt.replace(f'"{h.id}"', h.idc)
txt = re.sub(r'(": )"(λ.+?)"(?=,\n)', '": \\2', txt)
if not key_quotes:
txt = re.sub(r'(?<!: )"([a-zA-Z0-9_ ]+?)"(?=: )', "\\1", txt)
return txt
def show(self, colored=True, key_quotes=False):
r"""Print textual representation of a frozenidict object"""
print(self.astext(colored, key_quotes))
def copy(self): # pragma: no cover
raise Exception("A FrozenIdict doesn't need copies")
@property
def unfrozen(self):
from hoshmap.idict import Idict
return Idict(_frozen=self)
def __setitem__(self, key: str, value): # pragma: no cover
print(value)
raise Exception(f"Cannot set an entry ({key}) of a frozen dict.")
def __delitem__(self, key): # pragma: no cover
raise Exception(f"Cannot delete an entry ({key}) from a frozen dict.")
def __repr__(self):
return self.astext()
def __str__(self):
js = json.dumps(self.data, ensure_ascii=False, cls=CustomJSONEncoder)
return re.sub(r'(?<!: )"(\S*?)"', "\\1", js)
def __getitem__(self, item):
return self.data[item] if item in ["_id", "_ids"] else self.data[item].value
def __getattr__(self, item): # pragma: no cover
if item in self.data:
return self.data[item].value
return self.__getattribute__(item)
def __rshift__(self, other):
data = self.data.copy()
del data["_id"]
del data["_ids"]
if isinstance(other, tuple):
other = Let(other[0], other[1], *other[2:])
from hoshmap.idict import Idict
if isinstance(other, Idict): # merge
other = self.frozen
if isinstance(other, FrozenIdict): # merge
other = other.data
if isinstance(other, dict): # merge
clone = self
data0 = {}
for k, v in other.items():
if callable(v):
key = k
if isinstance(key, tuple):
key = ",".join(key)
if not key.startswith("→") or not key.startswith("->"):
key = f"→{key}"
clone = clone >> (v, key)
else:
data0[k] = v
data.update(clone.entries(evaluate=False))
data.update(data0)
return FrozenIdict(data)
# if not isinstance(other, list) and hasattr(other, "__setitem__") and hasattr(other, "__getitem__"):
# other = [other]
if isinstance(other, list):
caches = []
strict = []
for cache in other:
if isinstance(cache, list):
cache = cache[0]
strict.append(cache)
caches.append(cache)
for key, ival in self.entries(evaluate=False):
if ival.isevaluated:
for cache in strict:
# TODO: esse IF evita gravar toda hora, mas impede que metafields sejam atualizados
# usar um (mfid1 * mfid2 * ... * mfidn) pra manter link p/ mfs e tb saber se já existe
# if self.mid not in cache:
# cache[self.mid] = {"_ids": self.mids}
if self.id not in cache:
cache[self.id] = {"_ids": self.ids}
if ival.id not in cache:
if isinstance(ival.value, FrozenIdict):
ival.value >> [[cache]]
cache[ival.id] = ival.value # REMINDER: the dict-like cache should pack() the value if it wants.
else:
data[key] = ival.withcaches(caches, self.id, self.ids)
return FrozenIdict(data)
if isinstance(other, Let):
n = len(other.output)
deps = {}
for fin_source, fin_target in other.input.items():
if fin_source in data:
val = data[fin_source]
elif fin_source in other.input_space: # TODO: rnd
# val = other.rnd.choice(other.input_space[fin_source])
print("TODO: rnd")
pass
elif fin_source in other.input_values:
val = other.input_values[fin_source]
else: # pragma: no cover
# TODO: partial application [put multiple references for same iPartial if multioutput]
# raise exc if missing right most argument
raise Exception(
f"Missing field '{fin_source}'.\n"
f"idict:{list(self.keys())}\n"
f"let input:{other.input}\n"
f"let default:{other.input_values}\n"
f"let space:{other.input_space}"
)
if isinstance(val, iVal):
# REMINDER: only here 'val' comes from a field (not from Let)
deps[fin_target] = val
elif str(type(val)) == "<class 'pandas.core.frame.DataFrame'>":
data[fin_target], data[fin_target + "_"] = explode_df(val)
deps[fin_target] = data[fin_target]
else:
deps[fin_target] = StrictiVal(val)
shared_result = {}
for i, fout in enumerate(other.output):
data[fout] = LazyiVal(other.f, i, n, deps, shared_result, fid=other.id)
return FrozenIdict(data)
return NotImplemented # pragma: no cover
def __eq__(self, other):
if isinstance(other, dict):
if "_id" in other:
return self.id == other["_id"]
if list(self.keys()) != list(other.keys()):
return False
if isinstance(other, dict):
self.evaluate()
data = self.asdict
del data["_id"]
del data["_ids"]
return data == other
raise TypeError(f"Cannot compare {type(self)} and {type(other)}") # pragma: no cover
def __ne__(self, other):
return not (self == other)
def __reduce__(self):
# TODO: pickling idicts shouldn't be necessary after cache[id]=DFiVal is implemented
dic = self.data.copy()
ids = dic.pop("_ids").copy()
del dic["_id"]
return self.fromdict, (dic, ids)
def keys(self):
"""Generator of keys which don't start with '_'"""
return (k for k in self.data if not k.startswith("_"))
def values(self, evaluate=True):
"""Generator of field values (keys that don't start with '_')"""
return ((v.value if evaluate else v) for k, v in self.data.items() if not k.startswith("_"))
def items(self, evaluate=True):
"""Generator over field-value pairs"""
for k, ival in self.data.items():
if not k.startswith("_"):
yield k, (ival.value if evaluate else ival)
def metakeys(self):
"""Generator of keys which start with '_'"""
return (k for k in self.data if k.startswith("_") and k not in ["_id", "_ids"])
def metavalues(self, evaluate=True):
"""Generator of field values (keys don't start with '_')"""
return ((v.value if evaluate else v) for k, v in self.data.items() if k.startswith("_") and k not in ["_id", "_ids"])
def metaitems(self, evaluate=True):
"""Generator over field-value pairs"""
for k, ival in self.data.items():
if k.startswith("_") and k not in ["_id", "_ids"]:
yield k, (ival.value if evaluate else ival)
def entries(self, evaluate=True):
"""Iterator over all items"""
yield from self.items(evaluate)
yield from self.metaitems(evaluate)
@cached_property
def fields(self):
return list(self.keys())
@cached_property
def aslist(self):
return list(self.values())
@cached_property
def metafields(self):
"""List of keys which start with '_'"""
return [k for k in self.data if k.startswith("_") and k not in ["_id", "_ids"]]
@staticmethod
def fromid(id, cache) -> Union["FrozenIdict", None]:
return FrozenIdict.fetch(id, cache, isidict=True)
@staticmethod
def fetch(id, cache, isidict=False) -> Union["FrozenIdict", None]:
caches = cache if isinstance(cache, list) else [cache]
while id not in (cache := caches.pop(0)):
if not caches:
raise Exception(f"id '{id}' not found in the provided cache {cache.keys()}.")
obj = cache[id]
if isinstance(obj, dict):
if "_ids" not in obj:
raise Exception(f"Wrong content for idict under id {id}: missing '_ids' fields ({obj.keys()}).")
isidict = True
elif isidict:
raise Exception(f"Wrong content for idict expected under id {id}: {type(obj)}.")
# TODO: adopt lazy fetch: CacheableiVal as 'return' value
if isidict:
ids = obj["_ids"]
data = {}
for k, v in ids.items():
data[k] = FrozenIdict.fetch(v, cache)
return FrozenIdict.fromdict(data, ids)
return obj
Classes
class FrozenIdict (**kwargs)-
Frozen version of Idict
Nested idicts become frozen for consistent identities. Immutable values don't mess with their respective identifiers.
>>> "x" in FrozenIdict(x=2) TrueExpand source code
class FrozenIdict(UserDict, Dict[str, VT]): """ Frozen version of Idict Nested idicts become frozen for consistent identities. Immutable values don't mess with their respective identifiers. >>> "x" in FrozenIdict(x=2) True """ _evaluated = None # noinspection PyMissingConstructor def __init__(self, /, _dictionary=None, **kwargs): from hoshmap.idict import Idict data: Dict[str, Union[iVal, str, dict]] = _dictionary or {} data.update(kwargs) if "_id" in data.keys() or "_ids" in data.keys(): # pragma: no cover raise Exception(f"Cannot have a field named '_id'/'_ids': {data.keys()}") self.data: Dict[str, iVal] = {} self.hosh = self.mhosh = ø self.ids = {} self.mids = {} for k, v in data.items(): if isinstance(v, iVal): self.data[k] = v else: if isinstance(v, Idict): self.data[k] = StrictiVal(v.frozen, v.hosh) elif str(type(v)) == "<class 'pandas.core.frame.DataFrame'>": self.data[k], self.data[k + "_"] = explode_df(v) else: self.data[k] = StrictiVal(v) if k.startswith("_") and k not in ["_id", "_ids"]: pass # TODO: add mehosh (for metafields)? # TODO: add mihosh (for mirrorfields)? # TODO: specify new type of field: mirrorfield, e.g.: 'df_' is a mirror/derived from 'df' # self.mhosh += self.data[k].hosh * k.encode() # self.mids[k] = self.data[k].hosh.id else: self.hosh += self.data[k].hosh * k.encode() # PAPER9: remember to state in the paper that hash(identifier) must be different from hash(value), for any identifier and value. E.g.: hash(X) != hash("X") # Here the difference always happen because values are pickled, while identifiers are just encoded(). self.ids[k] = self.data[k].hosh.id # TODO: separate mids from ids? # noinspection PyTypeChecker self.data["_id"] = self.id = self.hosh.id self.data["_ids"] = self.ids # self.data["_mid"] = self.id = self.hosh.id # self.data["_mids"] = self.ids @staticmethod def fromdict(dictionary, ids): """Build a frozenidict from values and pre-defined ids""" data = {} for k, v in dictionary.items(): if isinstance(v, iVal): if k in ids and ids[k] != v.id: # pragma: no cover raise Exception(f"Conflicting ids provided for key '{k}': ival.id={v.id}; ids[{k}]={ids[k]}") data[k] = v else: data[k] = StrictiVal(v, ids[k]) return FrozenIdict(data) @property def evaluated(self): if self._evaluated is None: self._evaluated = self.evaluate() return self def evaluate(self): for k, ival in self.data.items(): if k not in ["_id", "_ids"]: ival.evaluate() return self @cached_property def asdict(self): dic = {k: v for k, v in self.entries()} dic["_id"] = self.id dic["_ids"] = self.ids.copy() return dic @cached_property def asdicts(self): dic = {} for k, v in self.entries(): dic[k] = v.asdicts if isinstance(v, FrozenIdict) else v dic["_id"] = self.id dic["_ids"] = self.ids.copy() return dic @cached_property def asdicts_hoshes_noneval(self): from hoshmap.value.cacheableival import CacheableiVal hoshes = set() dic = {} for k, ival in self.data.items(): if k not in ["_id", "_ids"]: hoshes.add(ival.hosh) if isinstance(ival, CacheableiVal): dic[k] = ival else: v = ival.value if isinstance(v, FrozenIdict): dic[k], subhoshes = v.asdicts_hoshes_noneval hoshes.update(subhoshes) else: dic[k] = v hoshes.add(self.hosh) dic["_id"] = self.id dic["_ids"] = self.ids.copy() return dic, hoshes def astext(self, colored=True, key_quotes=False): r"""Textual representation of a frozenidict object""" dicts, hoshes = self.asdicts_hoshes_noneval txt = json.dumps(dicts, indent=4, ensure_ascii=False, cls=CustomJSONEncoder) # Put colors after json, to avoid escaping ansi codes. if colored: for h in hoshes: txt = txt.replace(f'"{h.id}"', h.idc) txt = re.sub(r'(": )"(λ.+?)"(?=,\n)', '": \\2', txt) if not key_quotes: txt = re.sub(r'(?<!: )"([a-zA-Z0-9_ ]+?)"(?=: )', "\\1", txt) return txt def show(self, colored=True, key_quotes=False): r"""Print textual representation of a frozenidict object""" print(self.astext(colored, key_quotes)) def copy(self): # pragma: no cover raise Exception("A FrozenIdict doesn't need copies") @property def unfrozen(self): from hoshmap.idict import Idict return Idict(_frozen=self) def __setitem__(self, key: str, value): # pragma: no cover print(value) raise Exception(f"Cannot set an entry ({key}) of a frozen dict.") def __delitem__(self, key): # pragma: no cover raise Exception(f"Cannot delete an entry ({key}) from a frozen dict.") def __repr__(self): return self.astext() def __str__(self): js = json.dumps(self.data, ensure_ascii=False, cls=CustomJSONEncoder) return re.sub(r'(?<!: )"(\S*?)"', "\\1", js) def __getitem__(self, item): return self.data[item] if item in ["_id", "_ids"] else self.data[item].value def __getattr__(self, item): # pragma: no cover if item in self.data: return self.data[item].value return self.__getattribute__(item) def __rshift__(self, other): data = self.data.copy() del data["_id"] del data["_ids"] if isinstance(other, tuple): other = Let(other[0], other[1], *other[2:]) from hoshmap.idict import Idict if isinstance(other, Idict): # merge other = self.frozen if isinstance(other, FrozenIdict): # merge other = other.data if isinstance(other, dict): # merge clone = self data0 = {} for k, v in other.items(): if callable(v): key = k if isinstance(key, tuple): key = ",".join(key) if not key.startswith("→") or not key.startswith("->"): key = f"→{key}" clone = clone >> (v, key) else: data0[k] = v data.update(clone.entries(evaluate=False)) data.update(data0) return FrozenIdict(data) # if not isinstance(other, list) and hasattr(other, "__setitem__") and hasattr(other, "__getitem__"): # other = [other] if isinstance(other, list): caches = [] strict = [] for cache in other: if isinstance(cache, list): cache = cache[0] strict.append(cache) caches.append(cache) for key, ival in self.entries(evaluate=False): if ival.isevaluated: for cache in strict: # TODO: esse IF evita gravar toda hora, mas impede que metafields sejam atualizados # usar um (mfid1 * mfid2 * ... * mfidn) pra manter link p/ mfs e tb saber se já existe # if self.mid not in cache: # cache[self.mid] = {"_ids": self.mids} if self.id not in cache: cache[self.id] = {"_ids": self.ids} if ival.id not in cache: if isinstance(ival.value, FrozenIdict): ival.value >> [[cache]] cache[ival.id] = ival.value # REMINDER: the dict-like cache should pack() the value if it wants. else: data[key] = ival.withcaches(caches, self.id, self.ids) return FrozenIdict(data) if isinstance(other, Let): n = len(other.output) deps = {} for fin_source, fin_target in other.input.items(): if fin_source in data: val = data[fin_source] elif fin_source in other.input_space: # TODO: rnd # val = other.rnd.choice(other.input_space[fin_source]) print("TODO: rnd") pass elif fin_source in other.input_values: val = other.input_values[fin_source] else: # pragma: no cover # TODO: partial application [put multiple references for same iPartial if multioutput] # raise exc if missing right most argument raise Exception( f"Missing field '{fin_source}'.\n" f"idict:{list(self.keys())}\n" f"let input:{other.input}\n" f"let default:{other.input_values}\n" f"let space:{other.input_space}" ) if isinstance(val, iVal): # REMINDER: only here 'val' comes from a field (not from Let) deps[fin_target] = val elif str(type(val)) == "<class 'pandas.core.frame.DataFrame'>": data[fin_target], data[fin_target + "_"] = explode_df(val) deps[fin_target] = data[fin_target] else: deps[fin_target] = StrictiVal(val) shared_result = {} for i, fout in enumerate(other.output): data[fout] = LazyiVal(other.f, i, n, deps, shared_result, fid=other.id) return FrozenIdict(data) return NotImplemented # pragma: no cover def __eq__(self, other): if isinstance(other, dict): if "_id" in other: return self.id == other["_id"] if list(self.keys()) != list(other.keys()): return False if isinstance(other, dict): self.evaluate() data = self.asdict del data["_id"] del data["_ids"] return data == other raise TypeError(f"Cannot compare {type(self)} and {type(other)}") # pragma: no cover def __ne__(self, other): return not (self == other) def __reduce__(self): # TODO: pickling idicts shouldn't be necessary after cache[id]=DFiVal is implemented dic = self.data.copy() ids = dic.pop("_ids").copy() del dic["_id"] return self.fromdict, (dic, ids) def keys(self): """Generator of keys which don't start with '_'""" return (k for k in self.data if not k.startswith("_")) def values(self, evaluate=True): """Generator of field values (keys that don't start with '_')""" return ((v.value if evaluate else v) for k, v in self.data.items() if not k.startswith("_")) def items(self, evaluate=True): """Generator over field-value pairs""" for k, ival in self.data.items(): if not k.startswith("_"): yield k, (ival.value if evaluate else ival) def metakeys(self): """Generator of keys which start with '_'""" return (k for k in self.data if k.startswith("_") and k not in ["_id", "_ids"]) def metavalues(self, evaluate=True): """Generator of field values (keys don't start with '_')""" return ((v.value if evaluate else v) for k, v in self.data.items() if k.startswith("_") and k not in ["_id", "_ids"]) def metaitems(self, evaluate=True): """Generator over field-value pairs""" for k, ival in self.data.items(): if k.startswith("_") and k not in ["_id", "_ids"]: yield k, (ival.value if evaluate else ival) def entries(self, evaluate=True): """Iterator over all items""" yield from self.items(evaluate) yield from self.metaitems(evaluate) @cached_property def fields(self): return list(self.keys()) @cached_property def aslist(self): return list(self.values()) @cached_property def metafields(self): """List of keys which start with '_'""" return [k for k in self.data if k.startswith("_") and k not in ["_id", "_ids"]] @staticmethod def fromid(id, cache) -> Union["FrozenIdict", None]: return FrozenIdict.fetch(id, cache, isidict=True) @staticmethod def fetch(id, cache, isidict=False) -> Union["FrozenIdict", None]: caches = cache if isinstance(cache, list) else [cache] while id not in (cache := caches.pop(0)): if not caches: raise Exception(f"id '{id}' not found in the provided cache {cache.keys()}.") obj = cache[id] if isinstance(obj, dict): if "_ids" not in obj: raise Exception(f"Wrong content for idict under id {id}: missing '_ids' fields ({obj.keys()}).") isidict = True elif isidict: raise Exception(f"Wrong content for idict expected under id {id}: {type(obj)}.") # TODO: adopt lazy fetch: CacheableiVal as 'return' value if isidict: ids = obj["_ids"] data = {} for k, v in ids.items(): data[k] = FrozenIdict.fetch(v, cache) return FrozenIdict.fromdict(data, ids) return objAncestors
- collections.UserDict
- collections.abc.MutableMapping
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- builtins.dict
- typing.Generic
Static methods
def fetch(id, cache, isidict=False) ‑> Optional[FrozenIdict]-
Expand source code
@staticmethod def fetch(id, cache, isidict=False) -> Union["FrozenIdict", None]: caches = cache if isinstance(cache, list) else [cache] while id not in (cache := caches.pop(0)): if not caches: raise Exception(f"id '{id}' not found in the provided cache {cache.keys()}.") obj = cache[id] if isinstance(obj, dict): if "_ids" not in obj: raise Exception(f"Wrong content for idict under id {id}: missing '_ids' fields ({obj.keys()}).") isidict = True elif isidict: raise Exception(f"Wrong content for idict expected under id {id}: {type(obj)}.") # TODO: adopt lazy fetch: CacheableiVal as 'return' value if isidict: ids = obj["_ids"] data = {} for k, v in ids.items(): data[k] = FrozenIdict.fetch(v, cache) return FrozenIdict.fromdict(data, ids) return obj def fromdict(dictionary, ids)-
Build a frozenidict from values and pre-defined ids
Expand source code
@staticmethod def fromdict(dictionary, ids): """Build a frozenidict from values and pre-defined ids""" data = {} for k, v in dictionary.items(): if isinstance(v, iVal): if k in ids and ids[k] != v.id: # pragma: no cover raise Exception(f"Conflicting ids provided for key '{k}': ival.id={v.id}; ids[{k}]={ids[k]}") data[k] = v else: data[k] = StrictiVal(v, ids[k]) return FrozenIdict(data) def fromid(id, cache) ‑> Optional[FrozenIdict]-
Expand source code
@staticmethod def fromid(id, cache) -> Union["FrozenIdict", None]: return FrozenIdict.fetch(id, cache, isidict=True)
Instance variables
var asdict-
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: with self.lock: # check if another thread filled cache while we awaited lock val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val var asdicts-
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: with self.lock: # check if another thread filled cache while we awaited lock val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val var asdicts_hoshes_noneval-
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: with self.lock: # check if another thread filled cache while we awaited lock val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val var aslist-
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: with self.lock: # check if another thread filled cache while we awaited lock val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val var evaluated-
Expand source code
@property def evaluated(self): if self._evaluated is None: self._evaluated = self.evaluate() return self var fields-
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: with self.lock: # check if another thread filled cache while we awaited lock val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val var metafields-
List of keys which start with '_'
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: with self.lock: # check if another thread filled cache while we awaited lock val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val var unfrozen-
Expand source code
@property def unfrozen(self): from hoshmap.idict import Idict return Idict(_frozen=self)
Methods
def astext(self, colored=True, key_quotes=False)-
Textual representation of a frozenidict object
Expand source code
def astext(self, colored=True, key_quotes=False): r"""Textual representation of a frozenidict object""" dicts, hoshes = self.asdicts_hoshes_noneval txt = json.dumps(dicts, indent=4, ensure_ascii=False, cls=CustomJSONEncoder) # Put colors after json, to avoid escaping ansi codes. if colored: for h in hoshes: txt = txt.replace(f'"{h.id}"', h.idc) txt = re.sub(r'(": )"(λ.+?)"(?=,\n)', '": \\2', txt) if not key_quotes: txt = re.sub(r'(?<!: )"([a-zA-Z0-9_ ]+?)"(?=: )', "\\1", txt) return txt def copy(self)-
D.copy() -> a shallow copy of D
Expand source code
def copy(self): # pragma: no cover raise Exception("A FrozenIdict doesn't need copies") def entries(self, evaluate=True)-
Iterator over all items
Expand source code
def entries(self, evaluate=True): """Iterator over all items""" yield from self.items(evaluate) yield from self.metaitems(evaluate) def evaluate(self)-
Expand source code
def evaluate(self): for k, ival in self.data.items(): if k not in ["_id", "_ids"]: ival.evaluate() return self def items(self, evaluate=True)-
Generator over field-value pairs
Expand source code
def items(self, evaluate=True): """Generator over field-value pairs""" for k, ival in self.data.items(): if not k.startswith("_"): yield k, (ival.value if evaluate else ival) def keys(self)-
Generator of keys which don't start with '_'
Expand source code
def keys(self): """Generator of keys which don't start with '_'""" return (k for k in self.data if not k.startswith("_")) def metaitems(self, evaluate=True)-
Generator over field-value pairs
Expand source code
def metaitems(self, evaluate=True): """Generator over field-value pairs""" for k, ival in self.data.items(): if k.startswith("_") and k not in ["_id", "_ids"]: yield k, (ival.value if evaluate else ival) def metakeys(self)-
Generator of keys which start with '_'
Expand source code
def metakeys(self): """Generator of keys which start with '_'""" return (k for k in self.data if k.startswith("_") and k not in ["_id", "_ids"]) def metavalues(self, evaluate=True)-
Generator of field values (keys don't start with '_')
Expand source code
def metavalues(self, evaluate=True): """Generator of field values (keys don't start with '_')""" return ((v.value if evaluate else v) for k, v in self.data.items() if k.startswith("_") and k not in ["_id", "_ids"]) def show(self, colored=True, key_quotes=False)-
Print textual representation of a frozenidict object
Expand source code
def show(self, colored=True, key_quotes=False): r"""Print textual representation of a frozenidict object""" print(self.astext(colored, key_quotes)) def values(self, evaluate=True)-
Generator of field values (keys that don't start with '_')
Expand source code
def values(self, evaluate=True): """Generator of field values (keys that don't start with '_')""" return ((v.value if evaluate else v) for k, v in self.data.items() if not k.startswith("_"))