123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634 |
- r"""This is rule-based deduction system for SymPy
- The whole thing is split into two parts
- - rules compilation and preparation of tables
- - runtime inference
- For rule-based inference engines, the classical work is RETE algorithm [1],
- [2] Although we are not implementing it in full (or even significantly)
- it's still worth a read to understand the underlying ideas.
- In short, every rule in a system of rules is one of two forms:
- - atom -> ... (alpha rule)
- - And(atom1, atom2, ...) -> ... (beta rule)
- The major complexity is in efficient beta-rules processing and usually for an
- expert system a lot of effort goes into code that operates on beta-rules.
- Here we take minimalistic approach to get something usable first.
- - (preparation) of alpha- and beta- networks, everything except
- - (runtime) FactRules.deduce_all_facts
- _____________________________________
- ( Kirr: I've never thought that doing )
- ( logic stuff is that difficult... )
- -------------------------------------
- o ^__^
- o (oo)\_______
- (__)\ )\/\
- ||----w |
- || ||
- Some references on the topic
- ----------------------------
- [1] https://en.wikipedia.org/wiki/Rete_algorithm
- [2] http://reports-archive.adm.cs.cmu.edu/anon/1995/CMU-CS-95-113.pdf
- https://en.wikipedia.org/wiki/Propositional_formula
- https://en.wikipedia.org/wiki/Inference_rule
- https://en.wikipedia.org/wiki/List_of_rules_of_inference
- """
- from collections import defaultdict
- from typing import Iterator
- from .logic import Logic, And, Or, Not
- def _base_fact(atom):
- """Return the literal fact of an atom.
- Effectively, this merely strips the Not around a fact.
- """
- if isinstance(atom, Not):
- return atom.arg
- else:
- return atom
- def _as_pair(atom):
- if isinstance(atom, Not):
- return (atom.arg, False)
- else:
- return (atom, True)
- # XXX this prepares forward-chaining rules for alpha-network
- def transitive_closure(implications):
- """
- Computes the transitive closure of a list of implications
- Uses Warshall's algorithm, as described at
- http://www.cs.hope.edu/~cusack/Notes/Notes/DiscreteMath/Warshall.pdf.
- """
- full_implications = set(implications)
- literals = set().union(*map(set, full_implications))
- for k in literals:
- for i in literals:
- if (i, k) in full_implications:
- for j in literals:
- if (k, j) in full_implications:
- full_implications.add((i, j))
- return full_implications
- def deduce_alpha_implications(implications):
- """deduce all implications
- Description by example
- ----------------------
- given set of logic rules:
- a -> b
- b -> c
- we deduce all possible rules:
- a -> b, c
- b -> c
- implications: [] of (a,b)
- return: {} of a -> set([b, c, ...])
- """
- implications = implications + [(Not(j), Not(i)) for (i, j) in implications]
- res = defaultdict(set)
- full_implications = transitive_closure(implications)
- for a, b in full_implications:
- if a == b:
- continue # skip a->a cyclic input
- res[a].add(b)
- # Clean up tautologies and check consistency
- for a, impl in res.items():
- impl.discard(a)
- na = Not(a)
- if na in impl:
- raise ValueError(
- 'implications are inconsistent: %s -> %s %s' % (a, na, impl))
- return res
- def apply_beta_to_alpha_route(alpha_implications, beta_rules):
- """apply additional beta-rules (And conditions) to already-built
- alpha implication tables
- TODO: write about
- - static extension of alpha-chains
- - attaching refs to beta-nodes to alpha chains
- e.g.
- alpha_implications:
- a -> [b, !c, d]
- b -> [d]
- ...
- beta_rules:
- &(b,d) -> e
- then we'll extend a's rule to the following
- a -> [b, !c, d, e]
- """
- x_impl = {}
- for x in alpha_implications.keys():
- x_impl[x] = (set(alpha_implications[x]), [])
- for bcond, bimpl in beta_rules:
- for bk in bcond.args:
- if bk in x_impl:
- continue
- x_impl[bk] = (set(), [])
- # static extensions to alpha rules:
- # A: x -> a,b B: &(a,b) -> c ==> A: x -> a,b,c
- seen_static_extension = True
- while seen_static_extension:
- seen_static_extension = False
- for bcond, bimpl in beta_rules:
- if not isinstance(bcond, And):
- raise TypeError("Cond is not And")
- bargs = set(bcond.args)
- for x, (ximpls, bb) in x_impl.items():
- x_all = ximpls | {x}
- # A: ... -> a B: &(...) -> a is non-informative
- if bimpl not in x_all and bargs.issubset(x_all):
- ximpls.add(bimpl)
- # we introduced new implication - now we have to restore
- # completeness of the whole set.
- bimpl_impl = x_impl.get(bimpl)
- if bimpl_impl is not None:
- ximpls |= bimpl_impl[0]
- seen_static_extension = True
- # attach beta-nodes which can be possibly triggered by an alpha-chain
- for bidx, (bcond, bimpl) in enumerate(beta_rules):
- bargs = set(bcond.args)
- for x, (ximpls, bb) in x_impl.items():
- x_all = ximpls | {x}
- # A: ... -> a B: &(...) -> a (non-informative)
- if bimpl in x_all:
- continue
- # A: x -> a... B: &(!a,...) -> ... (will never trigger)
- # A: x -> a... B: &(...) -> !a (will never trigger)
- if any(Not(xi) in bargs or Not(xi) == bimpl for xi in x_all):
- continue
- if bargs & x_all:
- bb.append(bidx)
- return x_impl
- def rules_2prereq(rules):
- """build prerequisites table from rules
- Description by example
- ----------------------
- given set of logic rules:
- a -> b, c
- b -> c
- we build prerequisites (from what points something can be deduced):
- b <- a
- c <- a, b
- rules: {} of a -> [b, c, ...]
- return: {} of c <- [a, b, ...]
- Note however, that this prerequisites may be *not* enough to prove a
- fact. An example is 'a -> b' rule, where prereq(a) is b, and prereq(b)
- is a. That's because a=T -> b=T, and b=F -> a=F, but a=F -> b=?
- """
- prereq = defaultdict(set)
- for (a, _), impl in rules.items():
- if isinstance(a, Not):
- a = a.args[0]
- for (i, _) in impl:
- if isinstance(i, Not):
- i = i.args[0]
- prereq[i].add(a)
- return prereq
- ################
- # RULES PROVER #
- ################
- class TautologyDetected(Exception):
- """(internal) Prover uses it for reporting detected tautology"""
- pass
- class Prover:
- """ai - prover of logic rules
- given a set of initial rules, Prover tries to prove all possible rules
- which follow from given premises.
- As a result proved_rules are always either in one of two forms: alpha or
- beta:
- Alpha rules
- -----------
- This are rules of the form::
- a -> b & c & d & ...
- Beta rules
- ----------
- This are rules of the form::
- &(a,b,...) -> c & d & ...
- i.e. beta rules are join conditions that say that something follows when
- *several* facts are true at the same time.
- """
- def __init__(self):
- self.proved_rules = []
- self._rules_seen = set()
- def split_alpha_beta(self):
- """split proved rules into alpha and beta chains"""
- rules_alpha = [] # a -> b
- rules_beta = [] # &(...) -> b
- for a, b in self.proved_rules:
- if isinstance(a, And):
- rules_beta.append((a, b))
- else:
- rules_alpha.append((a, b))
- return rules_alpha, rules_beta
- @property
- def rules_alpha(self):
- return self.split_alpha_beta()[0]
- @property
- def rules_beta(self):
- return self.split_alpha_beta()[1]
- def process_rule(self, a, b):
- """process a -> b rule""" # TODO write more?
- if (not a) or isinstance(b, bool):
- return
- if isinstance(a, bool):
- return
- if (a, b) in self._rules_seen:
- return
- else:
- self._rules_seen.add((a, b))
- # this is the core of processing
- try:
- self._process_rule(a, b)
- except TautologyDetected:
- pass
- def _process_rule(self, a, b):
- # right part first
- # a -> b & c --> a -> b ; a -> c
- # (?) FIXME this is only correct when b & c != null !
- if isinstance(b, And):
- sorted_bargs = sorted(b.args, key=str)
- for barg in sorted_bargs:
- self.process_rule(a, barg)
- # a -> b | c --> !b & !c -> !a
- # --> a & !b -> c
- # --> a & !c -> b
- elif isinstance(b, Or):
- sorted_bargs = sorted(b.args, key=str)
- # detect tautology first
- if not isinstance(a, Logic): # Atom
- # tautology: a -> a|c|...
- if a in sorted_bargs:
- raise TautologyDetected(a, b, 'a -> a|c|...')
- self.process_rule(And(*[Not(barg) for barg in b.args]), Not(a))
- for bidx in range(len(sorted_bargs)):
- barg = sorted_bargs[bidx]
- brest = sorted_bargs[:bidx] + sorted_bargs[bidx + 1:]
- self.process_rule(And(a, Not(barg)), Or(*brest))
- # left part
- # a & b -> c --> IRREDUCIBLE CASE -- WE STORE IT AS IS
- # (this will be the basis of beta-network)
- elif isinstance(a, And):
- sorted_aargs = sorted(a.args, key=str)
- if b in sorted_aargs:
- raise TautologyDetected(a, b, 'a & b -> a')
- self.proved_rules.append((a, b))
- # XXX NOTE at present we ignore !c -> !a | !b
- elif isinstance(a, Or):
- sorted_aargs = sorted(a.args, key=str)
- if b in sorted_aargs:
- raise TautologyDetected(a, b, 'a | b -> a')
- for aarg in sorted_aargs:
- self.process_rule(aarg, b)
- else:
- # both `a` and `b` are atoms
- self.proved_rules.append((a, b)) # a -> b
- self.proved_rules.append((Not(b), Not(a))) # !b -> !a
- ########################################
- class FactRules:
- """Rules that describe how to deduce facts in logic space
- When defined, these rules allow implications to quickly be determined
- for a set of facts. For this precomputed deduction tables are used.
- see `deduce_all_facts` (forward-chaining)
- Also it is possible to gather prerequisites for a fact, which is tried
- to be proven. (backward-chaining)
- Definition Syntax
- -----------------
- a -> b -- a=T -> b=T (and automatically b=F -> a=F)
- a -> !b -- a=T -> b=F
- a == b -- a -> b & b -> a
- a -> b & c -- a=T -> b=T & c=T
- # TODO b | c
- Internals
- ---------
- .full_implications[k, v]: all the implications of fact k=v
- .beta_triggers[k, v]: beta rules that might be triggered when k=v
- .prereq -- {} k <- [] of k's prerequisites
- .defined_facts -- set of defined fact names
- """
- def __init__(self, rules):
- """Compile rules into internal lookup tables"""
- if isinstance(rules, str):
- rules = rules.splitlines()
- # --- parse and process rules ---
- P = Prover()
- for rule in rules:
- # XXX `a` is hardcoded to be always atom
- a, op, b = rule.split(None, 2)
- a = Logic.fromstring(a)
- b = Logic.fromstring(b)
- if op == '->':
- P.process_rule(a, b)
- elif op == '==':
- P.process_rule(a, b)
- P.process_rule(b, a)
- else:
- raise ValueError('unknown op %r' % op)
- # --- build deduction networks ---
- self.beta_rules = []
- for bcond, bimpl in P.rules_beta:
- self.beta_rules.append(
- ({_as_pair(a) for a in bcond.args}, _as_pair(bimpl)))
- # deduce alpha implications
- impl_a = deduce_alpha_implications(P.rules_alpha)
- # now:
- # - apply beta rules to alpha chains (static extension), and
- # - further associate beta rules to alpha chain (for inference
- # at runtime)
- impl_ab = apply_beta_to_alpha_route(impl_a, P.rules_beta)
- # extract defined fact names
- self.defined_facts = {_base_fact(k) for k in impl_ab.keys()}
- # build rels (forward chains)
- full_implications = defaultdict(set)
- beta_triggers = defaultdict(set)
- for k, (impl, betaidxs) in impl_ab.items():
- full_implications[_as_pair(k)] = {_as_pair(i) for i in impl}
- beta_triggers[_as_pair(k)] = betaidxs
- self.full_implications = full_implications
- self.beta_triggers = beta_triggers
- # build prereq (backward chains)
- prereq = defaultdict(set)
- rel_prereq = rules_2prereq(full_implications)
- for k, pitems in rel_prereq.items():
- prereq[k] |= pitems
- self.prereq = prereq
- def _to_python(self) -> str:
- """ Generate a string with plain python representation of the instance """
- return '\n'.join(self.print_rules())
- @classmethod
- def _from_python(cls, data : dict):
- """ Generate an instance from the plain python representation """
- self = cls('')
- for key in ['full_implications', 'beta_triggers', 'prereq']:
- d=defaultdict(set)
- d.update(data[key])
- setattr(self, key, d)
- self.beta_rules = data['beta_rules']
- self.defined_facts = set(data['defined_facts'])
- return self
- def _defined_facts_lines(self):
- yield 'defined_facts = ['
- for fact in sorted(self.defined_facts):
- yield f' {fact!r},'
- yield '] # defined_facts'
- def _full_implications_lines(self):
- yield 'full_implications = dict( ['
- for fact in sorted(self.defined_facts):
- for value in (True, False):
- yield f' # Implications of {fact} = {value}:'
- yield f' (({fact!r}, {value!r}), set( ('
- implications = self.full_implications[(fact, value)]
- for implied in sorted(implications):
- yield f' {implied!r},'
- yield ' ) ),'
- yield ' ),'
- yield ' ] ) # full_implications'
- def _prereq_lines(self):
- yield 'prereq = {'
- yield ''
- for fact in sorted(self.prereq):
- yield f' # facts that could determine the value of {fact}'
- yield f' {fact!r}: {{'
- for pfact in sorted(self.prereq[fact]):
- yield f' {pfact!r},'
- yield ' },'
- yield ''
- yield '} # prereq'
- def _beta_rules_lines(self):
- reverse_implications = defaultdict(list)
- for n, (pre, implied) in enumerate(self.beta_rules):
- reverse_implications[implied].append((pre, n))
- yield '# Note: the order of the beta rules is used in the beta_triggers'
- yield 'beta_rules = ['
- yield ''
- m = 0
- indices = {}
- for implied in sorted(reverse_implications):
- fact, value = implied
- yield f' # Rules implying {fact} = {value}'
- for pre, n in reverse_implications[implied]:
- indices[n] = m
- m += 1
- setstr = ", ".join(map(str, sorted(pre)))
- yield f' ({{{setstr}}},'
- yield f' {implied!r}),'
- yield ''
- yield '] # beta_rules'
- yield 'beta_triggers = {'
- for query in sorted(self.beta_triggers):
- fact, value = query
- triggers = [indices[n] for n in self.beta_triggers[query]]
- yield f' {query!r}: {triggers!r},'
- yield '} # beta_triggers'
- def print_rules(self) -> Iterator[str]:
- """ Returns a generator with lines to represent the facts and rules """
- yield from self._defined_facts_lines()
- yield ''
- yield ''
- yield from self._full_implications_lines()
- yield ''
- yield ''
- yield from self._prereq_lines()
- yield ''
- yield ''
- yield from self._beta_rules_lines()
- yield ''
- yield ''
- yield "generated_assumptions = {'defined_facts': defined_facts, 'full_implications': full_implications,"
- yield " 'prereq': prereq, 'beta_rules': beta_rules, 'beta_triggers': beta_triggers}"
- class InconsistentAssumptions(ValueError):
- def __str__(self):
- kb, fact, value = self.args
- return "%s, %s=%s" % (kb, fact, value)
- class FactKB(dict):
- """
- A simple propositional knowledge base relying on compiled inference rules.
- """
- def __str__(self):
- return '{\n%s}' % ',\n'.join(
- ["\t%s: %s" % i for i in sorted(self.items())])
- def __init__(self, rules):
- self.rules = rules
- def _tell(self, k, v):
- """Add fact k=v to the knowledge base.
- Returns True if the KB has actually been updated, False otherwise.
- """
- if k in self and self[k] is not None:
- if self[k] == v:
- return False
- else:
- raise InconsistentAssumptions(self, k, v)
- else:
- self[k] = v
- return True
- # *********************************************
- # * This is the workhorse, so keep it *fast*. *
- # *********************************************
- def deduce_all_facts(self, facts):
- """
- Update the KB with all the implications of a list of facts.
- Facts can be specified as a dictionary or as a list of (key, value)
- pairs.
- """
- # keep frequently used attributes locally, so we'll avoid extra
- # attribute access overhead
- full_implications = self.rules.full_implications
- beta_triggers = self.rules.beta_triggers
- beta_rules = self.rules.beta_rules
- if isinstance(facts, dict):
- facts = facts.items()
- while facts:
- beta_maytrigger = set()
- # --- alpha chains ---
- for k, v in facts:
- if not self._tell(k, v) or v is None:
- continue
- # lookup routing tables
- for key, value in full_implications[k, v]:
- self._tell(key, value)
- beta_maytrigger.update(beta_triggers[k, v])
- # --- beta chains ---
- facts = []
- for bidx in beta_maytrigger:
- bcond, bimpl = beta_rules[bidx]
- if all(self.get(k) is v for k, v in bcond):
- facts.append(bimpl)
|