# -*- coding: utf-8 -*- # Copyright (C) 2014-2021 Laboratoire de # Recherche et Développement de l'Epita (LRDE). # # This file is part of Spot, a model checking library. # # Spot is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # Spot is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public # License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import sys if sys.hexversion < 0x03030000: sys.exit("This module requires Python 3.3 or newer") import subprocess import os import signal import tempfile from contextlib import suppress as _supress if 'SPOT_UNINSTALLED' in os.environ: # When Spot is installed, _impl.so will be in the same directory as # spot/impl.py, however when we run Spot's test suite, Spot is not yet # installed and we want "from . import _impl" (generated by Swig4) to look # into .libs/ __path__.extend(sys.path) # We may have third-party plugins that want to be loaded as "spot.xxx", but # that are installed in a different $prefix. This sets things so that any # file that looks like spot-extra/xxx.py can be loaded with "import spot.xxx". for path in sys.path: if path not in __path__: path += "/spot-extra" if os.path.isdir(path): __path__.append(path) from spot.impl import * # spot.aux_ used to be called spot.aux until the filename aux.py # caused issues on Windows. So the file is now named aux_.py, but we # still want to import it as spot.aux, hence we add it to spot.modules # as an alias. import spot.aux_ as aux sys.modules['spot.aux'] = aux from spot.aux import \ extend as _extend, \ str_to_svg as _str_to_svg, \ ostream_to_svg as _ostream_to_svg # The parrameters used by default when show() is called on an automaton. _show_default = None def setup(**kwargs): """Configure Spot for fancy display. This is manly useful in Jupyter/IPython. Note that this function needs to be called before any automaton is displayed. Afterwards it will have no effect (you should restart Python, or the Jupyter/IPython Kernel). Parameters ---------- bullets : bool whether to display acceptance conditions as UTF8 bullets (default: True) fillcolor : str the color to use for states (default: '#ffffaa') size : str the width and height of the GraphViz output in inches (default: '10.2,5') font : str the font to use in the GraphViz output (default: 'Lato') show_default : str default options for show() max_states : int maximum number of states in GraphViz output (default: 50) """ import os s = ('size="{}" edge[arrowhead=vee, arrowsize=.7]') os.environ['SPOT_DOTEXTRA'] = s.format(kwargs.get('size', '10.13,5')) bullets = 'B' if kwargs.get('bullets', True) else '' max_states = '<' + str(kwargs.get('max_states', 50)) d = 'rf({})C({}){}'.format(kwargs.get('font', 'Lato'), kwargs.get('fillcolor', '#ffffaa'), bullets + max_states) global _show_default _show_default = kwargs.get('show_default', None) os.environ['SPOT_DOTDEFAULT'] = d # In version 3.0.2, Swig puts strongly typed enum in the main # namespace without prefixing them. Latter versions fix this. So we # can remove for following hack once 3.0.2 is no longer used in our # build farm. if 'op_ff' not in globals(): for i in ('ff', 'tt', 'eword', 'ap', 'Not', 'X', 'F', 'G', 'Closure', 'NegClosure', 'NegClosureMarked', 'Xor', 'Implies', 'Equiv', 'U', 'R', 'W', 'M', 'EConcat', 'EConcatMarked', 'UConcat', 'Or', 'OrRat', 'And', 'AndRat', 'AndNLM', 'Concat', 'Fusion', 'Star', 'FStar'): globals()['op_' + i] = globals()[i] del globals()[i] # Global BDD dict so that we do not have to create one in user code. _bdd_dict = make_bdd_dict() __om_init_tmp = option_map.__init__ def __om_init_new(self, str=None): __om_init_tmp(self) if str: res = self.parse_options(str) if res: raise RuntimeError("failed to parse option at: '" + str + "'") option_map.__init__ = __om_init_new @_extend(aig) class aig: def _repr_svg_(self, opt=None): ostr = ostringstream() print_dot(ostr, self, opt) return _ostream_to_svg(ostr) def show(self, opt=None): from spot.jupyter import SVG return SVG(self._repr_svg_(opt)) def to_str(self, format='circuit', opt=None): format = format.lower() if format == 'circuit': ostr = ostringstream() print_aiger(ostr, self) return ostr.str() if format == 'dot': ostr = ostringstream() print_dot(ostr, self, opt) return ostr.str() raise ValueError("unknown string format: " + format) def evaluate(self, gen): self.circ_init() outputs_pos = self.outputs() out_names = self.output_names() for x in gen: if len(x) != self.num_inputs(): raise ValueError("Incorrect number of inputs") self.circ_step(x) values = self.circ_state() res_val = [values[index] for index in outputs_pos] assert(len(res_val) == len(out_names)) yield list(zip(out_names, res_val)) __twa__acc1 = twa.acc __twa__acc2 = twa.get_acceptance # We store the automaton into the acceptance condition # returned by acc so that it does not crash when # taking the reference to a temporary, as in # spot.translate('Foo').acc() # Issue #468. def __twa_acc1_tmp(self): a = __twa__acc1(self) a._aut = self return a def __twa_acc2_tmp(self): a = __twa__acc2(self) a._aut = self return a twa.acc = __twa_acc1_tmp twa.get_acceptance = __twa_acc2_tmp @_extend(twa, ta) class twa: def _repr_svg_(self, opt=None): """Output the automaton as SVG""" ostr = ostringstream() if opt is None: global _show_default opt = _show_default print_dot(ostr, self, opt) return _ostream_to_svg(ostr) def show(self, opt=None): """Display the automaton as SVG, in the IPython/Jupyter notebook""" if opt is None: global _show_default opt = _show_default # Load the SVG function only if we need it. This way the # bindings can still be used outside of IPython if IPython is # not installed. from spot.jupyter import SVG return SVG(self._repr_svg_(opt)) def highlight_states(self, states, color): """Highlight a list of states. This can be a list of state numbers, or a list of Booleans.""" for idx, val in enumerate(states): if type(val) is bool: if val: self.highlight_state(idx, color) else: self.highlight_state(val, color) return self def highlight_edges(self, edges, color): """Highlight a list of edges. This can be a list of edge numbers, or a list of Booleans.""" for idx, val in enumerate(edges): if type(val) is bool: if val: self.highlight_edge(idx, color) else: self.highlight_edge(val, color) return self @_extend(twa) class twa: def to_str(a, format='hoa', opt=None): format = format.lower() if format == 'hoa': ostr = ostringstream() print_hoa(ostr, a, opt) return ostr.str() if format == 'dot': ostr = ostringstream() print_dot(ostr, a, opt) return ostr.str() if format == 'spin': ostr = ostringstream() print_never_claim(ostr, a, opt) return ostr.str() if format == 'lbtt': ostr = ostringstream() print_lbtt(ostr, a, opt) return ostr.str() raise ValueError("unknown string format: " + format) def save(a, filename, format='hoa', opt=None, append=False): with open(filename, 'a' if append else 'w') as f: s = a.to_str(format, opt) f.write(s) if s[-1] != '\n': f.write('\n') return a @_extend(twa_graph) class twa_graph: def show_storage(self, opt=None): ostr = ostringstream() self.dump_storage_as_dot(ostr, opt) from spot.jupyter import SVG return SVG(_ostream_to_svg(ostr)) def __copy__(self): return make_twa_graph(self, twa_prop_set.all(), True) def make_twa_graph(*args): from spot.impl import make_twa_graph as mtg if len(args) == 0: return mtg(_bdd_dict) return mtg(*args) @_extend(formula) class formula: def __init__(self, str): """Parse the given string to create a formula.""" if type(str) == formula: self.this = str else: self.this = parse_formula(str) def show_ast(self): """Display the syntax tree of the formula.""" # Load the SVG function only if we need it. This way the bindings # can still be used outside of IPython if IPython is not # installed. from spot.jupyter import SVG return SVG(_str_to_svg(self.to_str('d'))) def _repr_latex_(self): return '$' + self.to_str('j') + '$' def to_str(self, format='spot', parenth=False): if format == 'spot' or format == 'f': return str_psl(self, parenth) elif format == 'spin' or format == 's': return str_spin_ltl(self, parenth) elif format == 'utf8' or format == '8': return str_utf8_psl(self, parenth) elif format == 'lbt' or format == 'l': return str_lbt_ltl(self) elif format == 'wring' or format == 'w': return str_wring_ltl(self) elif format == 'latex' or format == 'x': return str_latex_psl(self, parenth) elif format == 'sclatex' or format == 'X': return str_sclatex_psl(self, parenth) elif format == 'mathjax' or format == 'j': return (str_sclatex_psl(self, parenth). replace("``", "\\unicode{x201C}"). replace("\\textrm{''}", "\\unicode{x201D}")) elif format == 'dot' or format == 'd': ostr = ostringstream() print_dot_psl(ostr, self) return ostr.str().encode('utf-8') else: raise ValueError("unknown string format: " + format) def __format__(self, spec): """Format the formula according to `spec`. Parameters ---------- spec : str, optional a list of letters that specify how the formula should be formatted. Supported specifiers -------------------- - 'f': use Spot's syntax (default) - '8': use Spot's syntax in UTF-8 mode - 's': use Spin's syntax - 'l': use LBT's syntax - 'w': use Wring's syntax - 'x': use LaTeX output - 'X': use self-contained LaTeX output - 'j': use self-contained LaTeX output, adjusted for MathJax Add some of those letters for additional options: - 'p': use full parentheses - 'c': escape the formula for CSV output (this will enclose the formula in double quotes, and escape any included double quotes) - 'h': escape the formula for HTML output - 'd': escape double quotes and backslash, for use in C-strings (the outermost double quotes are *not* added) - 'q': quote and escape for shell output, using single quotes or double quotes depending on the contents. - '[...]': rewrite away all the operators specified in brackets, using spot.unabbreviate(). - ':spec': pass the remaining specification to the formating function for strings. """ syntax = 'f' parent = False escape = None form = self while spec: c, spec = spec[0], spec[1:] if c in ('f', 's', '8', 'l', 'w', 'x', 'X', 'j'): syntax = c elif c == 'p': parent = True elif c in ('c', 'd', 'h', 'q'): escape = c elif c == ':': break elif c == '[': pos = spec.find(']') if pos < 0: raise ValueError("unclosed bracket: [" + spec) form = form.unabbreviate(spec[0:pos]) spec = spec[pos+1:] else: raise ValueError("unknown format specification: " + c + spec) s = form.to_str(syntax, parent) if escape == 'c': o = ostringstream() escape_rfc4180(o, s) s = '"' + o.str() + '"' elif escape == 'd': s = escape_str(s) elif escape == 'h': o = ostringstream() escape_html(o, s) s = o.str() elif escape == 'q': o = ostringstream() quote_shell_string(o, s) s = o.str() return s.__format__(spec) def traverse(self, func, *args): if func(self, *args): return for f in self: f.traverse(func, *args) def map(self, func, *args): k = self.kind() if k in (op_ff, op_tt, op_eword, op_ap): return self if k in (op_Not, op_X, op_F, op_G, op_Closure, op_NegClosure, op_NegClosureMarked): return formula.unop(k, func(self[0], *args)) if k in (op_Xor, op_Implies, op_Equiv, op_U, op_R, op_W, op_M, op_EConcat, op_EConcatMarked, op_UConcat): return formula.binop(k, func(self[0], *args), func(self[1], *args)) if k in (op_Or, op_OrRat, op_And, op_AndRat, op_AndNLM, op_Concat, op_Fusion): return formula.multop(k, [func(x, *args) for x in self]) if k in (op_Star, op_FStar): return formula.bunop(k, func(self[0], *args), self.min(), self.max()) raise ValueError("unknown type of formula") @_extend(atomic_prop_set) class atomic_prop_set: def _repr_latex_(self): res = '$\{' comma = '' for ap in self: apname = ap.to_str('j') if not '\\unicode{' in apname: apname = "\\unicode{x201C}" + apname + "\\unicode{x201D}" res += comma comma = ', ' res += apname res += '\}$' return res @_extend(zielonka_tree) class zielonka_tree: def _repr_svg_(self): """Output the Zielonka tree as SVG""" ostr = ostringstream() self.dot(ostr) return _ostream_to_svg(ostr) _acdnum = 0 @_extend(acd) class acd: def _repr_svg_(self, id=None): """Output the ACD as SVG""" ostr = ostringstream() self.dot(ostr, id) return _ostream_to_svg(ostr) def _repr_html_(self): global _acdnum num = _acdnum _acdnum += 1 style = ''' .acdhigh ellipse,.acdacc ellipse,.acdacc path,.acdacc polygon{stroke:green;} .acdhigh polygon,.acdrej ellipse,.acdrej path,.acdrej polygon{stroke:red;} .acdbold ellipse,.acdbold polygon,.acdbold path{stroke-width:2;} .acdrej polygon{fill:red;} .acdacc polygon{fill:green;} ''' js = ''' function acd{num}_clear(){{ $("#acd{num} .node,#acdaut{num} .node,#acdaut{num} .edge") .removeClass("acdhigh acdbold acdacc acdrej"); }}; function acd{num}_state(state){{ acd{num}_clear(); $("#acd{num} .acdS" + state).addClass("acdhigh acdbold"); $("#acdaut{num} #S" + state).addClass("acdbold"); }}; function acd{num}_edge(edge){{ acd{num}_clear(); var theedge = $('#acdaut{num} #E' + edge) var classList = theedge.attr('class').split(/\s+/); $.each(classList, function(index, item) {{ if (item.startsWith('acdN')) {{ $("#acd{num} #" + item.substring(3)).addClass("acdhigh acdbold"); }} }}); theedge.addClass("acdbold"); }}; function acd{num}_node(node, acc){{ acd{num}_clear(); $("#acdaut{num} .acdN" + node).addClass(acc ? "acdacc acdbold" : "acdrej acdbold"); $("#acd{num} #N" + node).addClass("acdbold acdhigh"); }};'''.format(num=num) me = 0 for n in range(self.node_count()): for e in self.edges_of_node(n): me = max(e, me) js += '$("#acdaut{num} #E{e}").addClass("acdN{n}");'\ .format(num=num, e=e, n=n) for e in range(1, me + 1): js += '$("#acdaut{num} #E{e}")'\ '.click(function(){{acd{num}_edge({e});}});'\ .format(num=num, e=e) for s in range(self.get_aut().num_states()): js += '$("#acdaut{num} #S{s}")'\ '.click(function(){{acd{num}_state({s});}});'\ .format(num=num, s=s) for n in range(self.node_count()): v = int(self.node_acceptance(n)) js += '$("#acd{num} #N{n}")'\ '.click(function(){{acd{num}_node({n}, {v});}});'\ .format(num=num, n=n, v=v) html = '
{}
{}
'\ .format(style, self.get_aut().show('.i(acdaut{})'.format(num)).data, self._repr_svg_("acd{}".format(num)), js); return html def automata(*sources, timeout=None, ignore_abort=True, trust_hoa=True, no_sid=False, debug=False, want_kripke=False): """Read automata from a list of sources. Parameters ---------- *sources : list of str These sources can be either commands (end with `|`), textual representations of automata (contain `\n`), or filenames (else). timeout : int, optional Number of seconds to wait for the result of a command. If None (the default), not limit is used. ignore_abort : bool, optional If True (the default), skip HOA atomata that ends with `--ABORT--`, and return the next automaton in the stream. If False, aborted automata are reported as syntax errors. trust_hoa : bool, optional If True (the default), supported HOA properies that cannot be easily verified are trusted. want_kripke : bool, optional If True, the input is expected to discribe Kripke structures, in the HOA format, and the returned type will be of type kripke_graph_ptr. no_sid : bool, optional When an automaton is obtained from a subprocess, this subprocess is started from a shell with its own session group (the default) unless no_sid is set to True. debug : bool, optional Whether to run the parser in debug mode. Notes ----- The automata can be written in the `HOA format`_, as `never claims`_, in `LBTT's format`_, or in `ltl2dstar's format`_. .. _HOA format: http://adl.github.io/hoaf/ .. _never claims: http://spinroot.com/spin/Man/never.html .. _LBTT's format: http://www.tcs.hut.fi/Software/lbtt/doc/html/Format-for-automata.html .. _ltl2dstar's format: http://www.ltl2dstar.de/docs/ltl2dstar.html#output-format-dstar If an argument ends with a `|`, then this argument is interpreted as a shell command, and the output of that command (without the `|`) is parsed. If an argument contains a newline, then it is interpreted as actual contents to be parsed. Otherwise, the argument is assumed to be a filename. The result of this function is a generator on all the automata objects read from these sources. The typical usage is:: for aut in spot.automata(filename, command, ...): # do something with aut When the source is a command, and no `timeout` is specified, parsing is done straight out of the pipe connecting the command. So for aut in spot.automata('randaut -H -n 10 2 |'): process(aut) will call `process(aut)` on each automaton as soon as it is output by `randaut`, and without waiting for `randaut` to terminate. However if `timeout` is passed, then `automata()` will wait for the entire command to terminate before parsing its entire output. If one command takes more than `timeout` seconds, `subprocess.TimeoutExpired` is raised. If any command terminates with a non-zero error, `subprocess.CalledProcessError` is raised. """ o = automaton_parser_options() o.debug = debug o.ignore_abort = ignore_abort o.trust_hoa = trust_hoa o.raise_errors = True o.want_kripke = want_kripke for filename in sources: try: p = None proc = None if filename[-1] == '|': setsid_maybe = None if not no_sid: setsid_maybe = os.setsid # universal_newlines for str output instead of bytes # when the pipe is read from Python (which happens # when timeout is set). prefn = None if no_sid else os.setsid proc = subprocess.Popen(filename[:-1], shell=True, preexec_fn=prefn, universal_newlines=True, stdout=subprocess.PIPE) if timeout is None: p = automaton_stream_parser(proc.stdout.fileno(), filename, o) else: try: out, err = proc.communicate(timeout=timeout) except subprocess.TimeoutExpired: # Using subprocess.check_output() with timeout # would just kill the shell, not its children. os.killpg(proc.pid, signal.SIGKILL) raise else: ret = proc.wait() if ret: raise subprocess.CalledProcessError(ret, filename[:-1]) finally: proc = None p = automaton_stream_parser(out, filename, o) elif '\n' in filename: p = automaton_stream_parser(filename, "", o) else: p = automaton_stream_parser(filename, o) a = True # Using proc as a context manager ensures that proc.stdout will be # closed on exit, and the process will be properly waited for. # This is important when running tools that produce an infinite # stream of automata and that must be killed once the generator # returned by spot.automata() is destroyed. Otherwise, _supress() # is just a dummy context manager that does nothing (Python 3.7 # introduces nullcontext() for this purpose, but at the time of # writing we support Python 3.4). mgr = proc if proc else _supress() with mgr: while a: # the automaton is None when we reach the end of the file. res = p.parse(_bdd_dict) a = res.ks if want_kripke else res.aut if a: yield a finally: # Make sure we destroy the parser (p) and the subprocess # (prop) in the correct order. del p if proc is not None: ret = proc.returncode del proc # Do not complain about the exit code if we are already raising # an exception. if ret and sys.exc_info()[0] is None: raise subprocess.CalledProcessError(ret, filename[:-1]) # deleting o explicitly now prevents Python 3.5 from # reporting the following error: " returned a result with # an error set". It's not clear to me if the bug is in Python # or Swig. At least it's related to the use of generators. del o return def automaton(filename, **kwargs): """Read a single automaton from a file. See `spot.automata` for a list of supported formats.""" try: return next(automata(filename, **kwargs)) except StopIteration: raise RuntimeError("Failed to read automaton from {}".format(filename)) def aiger_circuits(*sources, bdd_dict = None): """Read aiger circuits from a list of sources. Parameters ---------- *sources : list of str These sources can be either commands (end with `|`), textual representations of the circuit in restricted aag format (start with aag and contain '\n'), or filenames (else). bdd_dict : If not supplied, a fresh bdd_dict will be created, common for all of the circuits. """ bdd_dict = bdd_dict if bdd_dict is not None else make_bdd_dict() for source in sources: if (source.startswith("aag") and '\n' in source): yield aig.parse_aag(source, "", bdd_dict) else: yield aig.parse_aag(source, bdd_dict) def aiger_circuit(source, bdd_dict = None): """Read a single aiger circuit from file or textual representation. See `spot.aiger_circuits` for a list of supported formats.""" try: return next(aiger_circuits(source, bdd_dict = bdd_dict)) except StopIteration: raise RuntimeError("Failed to read an aiger circuit " "from {}".format(source)) def _postproc_translate_options(obj, default_type, *args): type_name_ = None type_ = None pref_name_ = None pref_ = None optm_name_ = None optm_ = None comp_ = 0 unam_ = 0 sbac_ = 0 colo_ = 0 def type_set(val): nonlocal type_, type_name_ if type_ is not None and type_name_ != val: raise ValueError("type cannot be both {} and {}" .format(type_name_, val)) elif val == 'generic' or val == 'gen' or val == 'g': type_ = postprocessor.Generic elif val == 'tgba': # historical type_ = postprocessor.GeneralizedBuchi elif val == 'generalizedbuchi': type_ = postprocessor.GeneralizedBuchi elif val == 'ba': # historical type_ = postprocessor.Buchi sbac_ = postprocessor.SBAcc elif val == 'buchi': type_ = postprocessor.Buchi elif val == 'cobuchi' or val == 'nca': type_ = postprocessor.CoBuchi elif val == 'dca': type_ = postprocessor.CoBuchi pref_ = postprocessor.Deterministic elif val == 'parity min odd': type_ = postprocessor.ParityMinOdd elif val == 'parity min even': type_ = postprocessor.ParityMinEven elif val == 'parity max odd': type_ = postprocessor.ParityMaxOdd elif val == 'parity max even': type_ = postprocessor.ParityMaxEven elif val == 'parity min': type_ = postprocessor.ParityMin elif val == 'parity max': type_ = postprocessor.ParityMax elif val == 'parity odd': type_ = postprocessor.ParityOdd elif val == 'parity even': type_ = postprocessor.ParityEven elif val == 'parity': type_ = postprocessor.Parity else: assert(val == 'monitor') type_ = postprocessor.Monitor type_name_ = val def pref_set(val): nonlocal pref_, pref_name_ if pref_ is not None and pref_name_ != val: raise ValueError("preference cannot be both {} and {}" .format(pref_name_, val)) elif val == 'small': pref_ = postprocessor.Small elif val == 'deterministic': pref_ = postprocessor.Deterministic else: assert(val == 'any') pref_ = postprocessor.Any pref_name_ = val def optm_set(val): nonlocal optm_, optm_name_ if optm_ is not None and optm_name_ != val: raise ValueError("optimization level cannot be both {} and {}" .format(optm_name_, val)) if val == 'high': optm_ = postprocessor.High elif val.startswith('med'): optm_ = postprocessor.Medium else: assert(val == 'low') optm_ = postprocessor.Low optm_name_ = val def misc_set(val): nonlocal comp_, unam_, sbac_, colo_ if val == 'colored': colo_ = postprocessor.Colored elif val == 'complete': comp_ = postprocessor.Complete elif (val == 'sbacc' or val == 'statebasedacceptance' or val == 'state-based-acceptance'): sbac_ = postprocessor.SBAcc else: assert(val == 'unambiguous') unam_ = postprocessor.Unambiguous options = { 'any': pref_set, 'ba': type_set, 'buchi': type_set, 'cobuchi': type_set, 'colored': misc_set, 'complete': misc_set, 'dca': type_set, 'deterministic': pref_set, 'g': type_set, 'gen': type_set, 'generic': type_set, 'generalizedbuchi': type_set, 'high': optm_set, 'low': optm_set, 'medium': optm_set, 'monitor': type_set, 'nca': type_set, 'parity even': type_set, 'parity max even': type_set, 'parity max odd': type_set, 'parity max': type_set, 'parity min even': type_set, 'parity min odd': type_set, 'parity min': type_set, 'parity odd': type_set, 'parity': type_set, 'sbacc': misc_set, 'small': pref_set, 'statebasedacceptance': misc_set, 'state-based-acceptance': misc_set, 'tgba': type_set, 'unambiguous': misc_set, } for arg in args: arg = arg.lower() fn = options.get(arg) if fn: fn(arg) else: # arg is not an know option, but maybe it is a prefix of # one of them compat = [] f = None for key, fn in options.items(): if key.startswith(arg): compat.append(key) f = fn lc = len(compat) if lc == 1: f(compat[0]) elif lc < 1: raise ValueError("unknown option '{}'".format(arg)) else: raise ValueError("ambiguous option '{}' is prefix of {}" .format(arg, str(compat))) if type_ is None: type_ = default_type if pref_ is None: pref_ = postprocessor.Small if optm_ is None: optm_ = postprocessor.High obj.set_type(type_) obj.set_pref(pref_ | comp_ | unam_ | sbac_ | colo_) obj.set_level(optm_) def translate(formula, *args, dict=_bdd_dict, xargs=None): """Translate a formula into an automaton. Keep in mind that 'Deterministic' expresses just a preference that may not be satisfied. The optional arguments should be strings among the following: - at most one in 'GeneralizedBuchi', 'Buchi', or 'Monitor', 'generic', 'parity', 'parity min odd', 'parity min even', 'parity max odd', 'parity max even', 'coBuchi' (type of acceptance condition to build) - at most one in 'Small', 'Deterministic', 'Any' (preferred characteristics of the produced automaton) - at most one in 'Low', 'Medium', 'High' (optimization level) - any combination of 'Complete', 'Unambiguous', 'StateBasedAcceptance' (or 'SBAcc' for short), and 'Colored' (only for parity acceptance) The default corresponds to 'tgba', 'small' and 'high'. Additional options can be supplied using a `spot.option_map`, or a string (that will be converted to `spot.option_map`), as the `xargs` argument. This is similar to the `-x` option of command-line tools; so check out the spot-x(7) man page for details. """ if type(xargs) is str: xargs = option_map(xargs) a = translator(dict, xargs) _postproc_translate_options(a, postprocessor.TGBA, *args) if type(formula) == str: formula = parse_formula(formula) result = a.run(formula) if xargs: xargs.report_unused_options() return result formula.translate = translate # Wrap C++-functions into lambdas so that they get converted into # instance methods (i.e., self passed as first argument # automatically), because only user-defined functions are converted as # instance methods. def _add_formula(meth, name=None): setattr(formula, name or meth, (lambda self, *args, **kwargs: globals()[meth](self, *args, **kwargs))) _add_formula('contains') _add_formula('are_equivalent', 'equivalent_to') def postprocess(automaton, *args, formula=None, xargs=None): """Post process an automaton. This applies a number of simplification algorithms, depending on the options supplied. Keep in mind that 'Deterministic' expresses just a preference that may not be satisfied if the input is not already 'Deterministic'. The optional arguments should be strings among the following: - at most one in 'Generic', 'GeneralizedBuchi', 'Buchi', or 'Monitor', 'parity', 'parity min odd', 'parity min even', 'parity max odd', 'parity max even', 'coBuchi' (type of acceptance condition to build) - at most one in 'Small', 'Deterministic', 'Any' (preferred characteristics of the produced automaton) - at most one in 'Low', 'Medium', 'High' (optimization level) - any combination of 'Complete', 'StateBasedAcceptance' (or 'SBAcc' for short), and 'Colored (only for parity acceptance) The default corresponds to 'generic', 'small' and 'high'. If a formula denoted by this automaton is known, pass it to as the optional `formula` argument; it can help some algorithms by providing an easy way to complement the automaton. Additional options can be supplied using a `spot.option_map`, or a string (that will be converted to `spot.option_map`), as the `xargs` argument. This is similar to the `-x` option of command-line tools; so check out the spot-x(7) man page for details. """ if type(xargs) is str: xargs = option_map(xargs) p = postprocessor(xargs) if type(automaton) == str: automaton = globals()['automaton'](automaton) _postproc_translate_options(p, postprocessor.Generic, *args) result = p.run(automaton, formula) if xargs: xargs.report_unused_options() return result twa.postprocess = postprocess # Wrap C++-functions into lambdas so that they get converted into # instance methods (i.e., self passed as first argument # automatically), because only user-defined functions are converted as # instance methods. def _add_twa_graph(meth, name=None): setattr(twa_graph, name or meth, (lambda self, *args, **kwargs: globals()[meth](self, *args, **kwargs))) for meth in ('scc_filter', 'scc_filter_states', 'is_deterministic', 'is_unambiguous', 'contains', 'get_strategy', 'set_state_players', 'get_state_players', 'set_state_player', 'get_state_player', 'get_state_winners', 'get_state_winner'): _add_twa_graph(meth) _add_twa_graph('are_equivalent', 'equivalent_to') # Wrapper around a formula iterator to which we add some methods of formula # (using _addfilter and _addmap), so that we can write things like # formulas.simplify().is_X_free(). class formulaiterator: def __init__(self, formulas): self._formulas = formulas def __iter__(self): return self def __next__(self): return next(self._formulas) # fun shoud be a predicate and should be a method of formula. # _addfilter adds this predicate as a filter whith the same name in # formulaiterator. def _addfilter(fun): def filtf(self, *args, **kwargs): it = filter(lambda f: getattr(f, fun)(*args, **kwargs), self) return formulaiterator(it) def nfiltf(self, *args, **kwargs): it = filter(lambda f: not getattr(f, fun)(*args, **kwargs), self) return formulaiterator(it) if fun[:3] == 'is_': notfun = 'is_not_' + fun[3:] elif fun[:4] == 'has_': notfun = 'has_no_' + fun[4:] else: notfun = 'not_' + fun setattr(formulaiterator, fun, filtf) setattr(formulaiterator, notfun, nfiltf) # fun should be a function taking a formula as its first parameter and # returning a formula. _addmap adds this function as a method of # formula and formulaiterator. def _addmap(fun): def mapf(self, *args, **kwargs): return formulaiterator(map(lambda f: getattr(f, fun)(*args, **kwargs), self)) setattr(formula, fun, lambda self, *args, **kwargs: globals()[fun](self, *args, **kwargs)) setattr(formulaiterator, fun, mapf) def randltl(ap, n=-1, **kwargs): """Generate random formulas. Returns a random formula iterator. ap: the number of atomic propositions used to generate random formulas. n: number of formulas to generate, or unbounded if n < 0. **kwargs: seed: seed for the random number generator (0). output: can be 'ltl', 'psl', 'bool' or 'sere' ('ltl'). allow_dups: allow duplicate formulas (False). tree_size: tree size of the formulas generated, before mandatory simplifications (15) boolean_priorities: set priorities for Boolean formulas. ltl_priorities: set priorities for LTL formulas. sere_priorities: set priorities for SERE formulas. dump_priorities: show current priorities, do not generate any formula. simplify: 0 No rewriting 1 basic rewritings and eventual/universal rules 2 additional syntactic implication rules 3 (default) better implications using containment """ opts = option_map() output_map = { "ltl": randltlgenerator.LTL, "psl": randltlgenerator.PSL, "bool": randltlgenerator.Bool, "sere": randltlgenerator.SERE } if isinstance(ap, list): aprops = atomic_prop_set() for elt in ap: aprops.insert(formula.ap(elt)) ap = aprops ltl_priorities = kwargs.get("ltl_priorities", None) sere_priorities = kwargs.get("sere_priorities", None) boolean_priorities = kwargs.get("boolean_priorities", None) output = output_map[kwargs.get("output", "ltl")] opts.set("output", output) opts.set("seed", kwargs.get("seed", 0)) tree_size = kwargs.get("tree_size", 15) if isinstance(tree_size, tuple): tree_size_min, tree_size_max = tree_size else: tree_size_min = tree_size_max = tree_size opts.set("tree_size_min", tree_size_min) opts.set("tree_size_max", tree_size_max) opts.set("unique", not kwargs.get("allow_dups", False)) opts.set("wf", kwargs.get("weak_fairness", False)) simpl_level = kwargs.get("simplify", 0) if simpl_level > 3 or simpl_level < 0: sys.stderr.write('invalid simplification level: ' + simpl_level) return opts.set("simplification_level", simpl_level) rg = randltlgenerator(ap, opts, ltl_priorities, sere_priorities, boolean_priorities) dump_priorities = kwargs.get("dump_priorities", False) if dump_priorities: dumpstream = ostringstream() if output == randltlgenerator.LTL: print('Use argument ltl_priorities=STRING to set the following ' 'LTL priorities:\n') rg.dump_ltl_priorities(dumpstream) print(dumpstream.str()) elif output == randltlgenerator.Bool: print('Use argument boolean_priorities=STRING to set the ' 'following Boolean formula priorities:\n') rg.dump_bool_priorities(dumpstream) print(dumpstream.str()) elif output == randltlgenerator.PSL or output == randltlgenerator.SERE: if output != randltlgenerator.SERE: print('Use argument ltl_priorities=STRING to set the ' 'following LTL priorities:\n') rg.dump_psl_priorities(dumpstream) print(dumpstream.str()) print('Use argument sere_priorities=STRING to set the ' 'following SERE priorities:\n') rg.dump_sere_priorities(dumpstream) print(dumpstream.str()) print('Use argument boolean_priorities=STRING to set the ' 'following Boolean formula priorities:\n') rg.dump_sere_bool_priorities(dumpstream) print(dumpstream.str()) else: sys.stderr.write("internal error: unknown type of output") return class _randltliterator: def __init__(self, rg, n): self.rg = rg self.i = 0 self.n = n def __iter__(self): return self def __next__(self): if self.i == self.n: raise StopIteration f = self.rg.next() if f is None: sys.stderr.write("Warning: could not generate a new " "unique formula after {} trials.\n" .format(randltlgenerator.MAX_TRIALS)) raise StopIteration self.i += 1 return f return formulaiterator(_randltliterator(rg, n)) def simplify(f, **kwargs): level = kwargs.get('level', None) if level is not None: return tl_simplifier(tl_simplifier_options(level)).simplify(f) basics = kwargs.get('basics', True) synt_impl = kwargs.get('synt_impl', True) event_univ = kwargs.get('event_univ', True) cont_checks = kwargs.get('containment_checks', False) cont_checks_stronger = kwargs.get('containment_checks_stronger', False) nenoform_stop_on_boolean = kwargs.get('nenoform_stop_on_boolean', False) reduce_size_strictly = kwargs.get('reduce_size_strictly', False) boolean_to_isop = kwargs.get('boolean_to_isop', False) favor_event_univ = kwargs.get('favor_event_univ', False) simp_opts = tl_simplifier_options(basics, synt_impl, event_univ, cont_checks, cont_checks_stronger, nenoform_stop_on_boolean, reduce_size_strictly, boolean_to_isop, favor_event_univ) return tl_simplifier(simp_opts).simplify(f) for fun in dir(formula): if (callable(getattr(formula, fun)) and (fun[:3] == 'is_' or fun[:4] == 'has_')): _addfilter(fun) for fun in ['remove_x', 'relabel', 'relabel_bse', 'simplify', 'unabbreviate', 'negative_normal_form', 'mp_class', 'nesting_depth']: _addmap(fun) # Better interface to the corresponding C++ function. def sat_minimize(aut, acc=None, colored=False, state_based=False, states=0, max_states=0, sat_naive=False, sat_langmap=False, sat_incr=0, sat_incr_steps=0, display_log=False, return_log=False): args = '' if acc is not None: if type(acc) is not str: raise ValueError("argument 'acc' should be a string") args += ',acc="' + acc + '"' if colored: args += ',colored' if states: if type(states) is not int or states < 0: raise ValueError("argument 'states' should be a positive integer") args += ',states=' + str(states) if max_states: if type(max_states) is not int or max_states < 0: raise ValueError("argument 'states' should be a positive integer") args += ',max-states=' + str(max_states) if sat_naive: args += ',sat-naive' if sat_langmap: args += ',sat-langmap' if sat_incr: args += ',sat-incr=' + str(sat_incr) args += ',sat-incr-steps=' + str(sat_incr_steps) from spot.impl import sat_minimize as sm if display_log or return_log: import pandas as pd with tempfile.NamedTemporaryFile(dir='.', suffix='.satlog') as t: args += ',log="{}"'.format(t.name) aut = sm(aut, args, state_based) dfrm = pd.read_csv(t.name, dtype=object) if display_log: # old versions of ipython do not import display by default from IPython.display import display del dfrm['automaton'] display(dfrm) if return_log: return aut, dfrm else: return aut else: return sm(aut, args, state_based) def parse_word(word, dic=_bdd_dict): from spot.impl import parse_word as pw return pw(word, dic) def bdd_to_formula(b, dic=_bdd_dict): from spot.impl import bdd_to_formula as bf return bf(b, dic) def language_containment_checker(dic=_bdd_dict): from spot.impl import language_containment_checker as c c.contains = lambda this, a, b: c.contained(this, b, a) c.are_equivalent = lambda this, a, b: c.equal(this, a, b) return c(dic) def mp_hierarchy_svg(cl=None): """ Return an some string containing an SVG picture of the Manna & Pnueli hierarchy, highlighting class `cl` if given. If not None, `cl` should be one of 'TPROGSB'. For convenience, if `cl` is an instance of `spot.formula`, it is replaced by `mp_class(cl)`. """ if type(cl) == formula: cl = mp_class(cl) ch = None coords = { 'T': '110,35', 'R': '40,80', 'P': '175,80', 'O': '110,140', 'S': '40,160', 'G': '175,160', 'B': '110,198', } if cl in coords: highlight = ''' '''.format(coords[cl]) else: highlight = '' return ''' ''' + highlight + ''' Reactivity Recurrence Persistence Obligation Safety Guarantee Monitor Deterministic Büchi Terminal Büchi Weak Büchi ''' def show_mp_hierarchy(cl): """ Return a picture of the Manna & Pnueli hierarchy as an SVG object in the IPython/Jupyter. """ from spot.jupyter import SVG return SVG(mp_hierarchy_svg(cl)) formula.show_mp_hierarchy = show_mp_hierarchy @_extend(twa_word) class twa_word: def _repr_latex_(self): bd = self.get_dict() res = '$' for idx, letter in enumerate(self.prefix): if idx: res += '; ' res += bdd_to_formula(letter, bd).to_str('j') if len(res) > 1: res += '; ' res += '\\mathsf{cycle}\\{' for idx, letter in enumerate(self.cycle): if idx: res += '; ' res += bdd_to_formula(letter, bd).to_str('j') return res + '\\}$' def as_svg(self): """ Build an SVG picture representing the word as a collection of signals for each atomic proposition. """ # Get the list of atomic proposition used sup = buddy.bddtrue for cond in list(self.prefix) + list(self.cycle): sup = sup & buddy.bdd_support(cond) ap = [] while sup != buddy.bddtrue: a = buddy.bdd_var(sup) ap.append(a) sup = buddy.bdd_high(sup) # Prepare canvas psize = len(self.prefix) csize = len(self.cycle) d = { 'endprefix': 50 * psize, 'endcycle': 50 * (psize + csize), 'w': 50 * (psize + csize * 2), 'height': 50 * len(ap), 'height2': 50 * len(ap) + 10, 'h3': 50 * len(ap) + 12, 'bgcolor': '#f4f4f4', 'bgl': 'stroke="white" stroke-width="4"', 'bgt': 'stroke="white" stroke-width="1"', 'txt': 'text-anchor="start" font-size="20"', 'red': 'stroke="#ff0000" stroke-width="2"', 'sml': 'text-anchor="start" font-size="10"' } txt = ''' '''.format(**d) # Iterate over all used atomic propositions, and fill each line l = list(self.prefix) + list(self.cycle) + list(self.cycle) bd = self.get_dict() for ypos, a in enumerate(ap): pa = buddy.bdd_ithvar(a) na = buddy.bdd_nithvar(a) name = bdd_format_formula(bd, pa) # Whether the last state was down (-1), up (1), or unknown (0) last = 0 txt += ('' .format(y=ypos*50, **d)) txt += ('{name}' .format(x=3, y=ypos*50+30, name=name, **d)) for xpos, step in enumerate(l): if buddy.bdd_implies(step, pa): cur = 1 elif buddy.bdd_implies(step, na): cur = -1 else: cur = 0 txt += ('' .format(x=(xpos+1)*50, y1=ypos*50, y2=ypos*50+50, **d)) if cur != 0: if last == -cur: txt += \ ('' .format(x=xpos*50, y1=ypos*50+5, y2=ypos*50+45, **d)) txt += \ ('' .format(x1=xpos*50, x2=(xpos+1)*50, y=ypos*50+25-20*cur, **d)) last = cur if psize > 0: txt += 'prefix'.format(**d) txt += '''cycle cycle'''.format(**d) return txt + '' def show(self): """ Display the word as an SVG picture of signals. """ from spot.jupyter import SVG return SVG(self.as_svg()) # Make scc_and_mark filter usable as context manager @_extend(scc_and_mark_filter) class scc_and_mark_filter: def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.restore_acceptance() def to_parity(aut, options = None, **kwargs): """Convert aut into a parity acceptance. This procedure combines multiple strategies to attempt to produce a small parity automaton. The resulting parity acceptance is either "max odd" or "max even". The default optimization options maybe altered by passing either an instance of to_parity_options object as options argument, or by passing additional kwargs that will be used to update options. Note that if you pass both your own options object and kwargs, options will be updated in place. """ if options is None: # Can't make this a default option, otherwise the default # instance would be updated by the kwargs. options = to_parity_options() if kwargs: for key,val in to_parity_options.__dict__.items(): if not key.startswith('_') and key != "thisown" and key in kwargs: setattr(options, key, kwargs.get(key)) return impl.to_parity(aut, options)