* doc/org/tut40.org: Add more explanations and some cleanup. * python/spot/__init__.py (set_state_players, get_state_winners, get_state_players, set_state_player, get_state_winner, get_state_player, get_strategy): Add these methods to the twa_graph class for convenience. * NEWS, doc/org/tut.org: Mention tut40.org.
1516 lines
53 KiB
Python
1516 lines
53 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright (C) 2014-2021 Laboratoire de
|
|
# Recherche et Développement de l'Epita (LRDE).
|
|
#
|
|
# This file is part of Spot, a model checking library.
|
|
#
|
|
# Spot is free software; you can redistribute it and/or modify it
|
|
# under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# Spot is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
|
|
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
|
|
# License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
|
|
import sys
|
|
|
|
if sys.hexversion < 0x03030000:
|
|
sys.exit("This module requires Python 3.3 or newer")
|
|
|
|
import subprocess
|
|
import os
|
|
import signal
|
|
import tempfile
|
|
from contextlib import suppress as _supress
|
|
|
|
if 'SPOT_UNINSTALLED' in os.environ:
|
|
# When Spot is installed, _impl.so will be in the same directory as
|
|
# spot/impl.py, however when we run Spot's test suite, Spot is not yet
|
|
# installed and we want "from . import _impl" (generated by Swig4) to look
|
|
# into .libs/
|
|
__path__.extend(sys.path)
|
|
|
|
# We may have third-party plugins that want to be loaded as "spot.xxx", but
|
|
# that are installed in a different $prefix. This sets things so that any
|
|
# file that looks like spot-extra/xxx.py can be loaded with "import spot.xxx".
|
|
for path in sys.path:
|
|
if path not in __path__:
|
|
path += "/spot-extra"
|
|
if os.path.isdir(path):
|
|
__path__.append(path)
|
|
|
|
|
|
from spot.impl import *
|
|
# spot.aux_ used to be called spot.aux until the filename aux.py
|
|
# caused issues on Windows. So the file is now named aux_.py, but we
|
|
# still want to import it as spot.aux, hence we add it to spot.modules
|
|
# as an alias.
|
|
import spot.aux_ as aux
|
|
sys.modules['spot.aux'] = aux
|
|
from spot.aux import \
|
|
extend as _extend, \
|
|
str_to_svg as _str_to_svg, \
|
|
ostream_to_svg as _ostream_to_svg
|
|
|
|
|
|
# The parrameters used by default when show() is called on an automaton.
|
|
_show_default = None
|
|
|
|
|
|
def setup(**kwargs):
|
|
"""Configure Spot for fancy display.
|
|
|
|
This is manly useful in Jupyter/IPython.
|
|
|
|
Note that this function needs to be called before any automaton is
|
|
displayed. Afterwards it will have no effect (you should restart
|
|
Python, or the Jupyter/IPython Kernel).
|
|
|
|
Parameters
|
|
----------
|
|
bullets : bool
|
|
whether to display acceptance conditions as UTF8 bullets
|
|
(default: True)
|
|
fillcolor : str
|
|
the color to use for states (default: '#ffffaa')
|
|
size : str
|
|
the width and height of the GraphViz output in inches
|
|
(default: '10.2,5')
|
|
font : str
|
|
the font to use in the GraphViz output (default: 'Lato')
|
|
show_default : str
|
|
default options for show()
|
|
max_states : int
|
|
maximum number of states in GraphViz output (default: 50)
|
|
"""
|
|
import os
|
|
|
|
s = ('size="{}" edge[arrowhead=vee, arrowsize=.7]')
|
|
os.environ['SPOT_DOTEXTRA'] = s.format(kwargs.get('size', '10.13,5'))
|
|
|
|
bullets = 'B' if kwargs.get('bullets', True) else ''
|
|
max_states = '<' + str(kwargs.get('max_states', 50))
|
|
d = 'rf({})C({}){}'.format(kwargs.get('font', 'Lato'),
|
|
kwargs.get('fillcolor', '#ffffaa'),
|
|
bullets + max_states)
|
|
global _show_default
|
|
_show_default = kwargs.get('show_default', None)
|
|
os.environ['SPOT_DOTDEFAULT'] = d
|
|
|
|
|
|
# In version 3.0.2, Swig puts strongly typed enum in the main
|
|
# namespace without prefixing them. Latter versions fix this. So we
|
|
# can remove for following hack once 3.0.2 is no longer used in our
|
|
# build farm.
|
|
if 'op_ff' not in globals():
|
|
for i in ('ff', 'tt', 'eword', 'ap', 'Not', 'X', 'F', 'G',
|
|
'Closure', 'NegClosure', 'NegClosureMarked',
|
|
'Xor', 'Implies', 'Equiv', 'U', 'R', 'W', 'M',
|
|
'EConcat', 'EConcatMarked', 'UConcat', 'Or',
|
|
'OrRat', 'And', 'AndRat', 'AndNLM', 'Concat',
|
|
'Fusion', 'Star', 'FStar'):
|
|
globals()['op_' + i] = globals()[i]
|
|
del globals()[i]
|
|
|
|
|
|
# Global BDD dict so that we do not have to create one in user code.
|
|
_bdd_dict = make_bdd_dict()
|
|
|
|
|
|
__om_init_tmp = option_map.__init__
|
|
|
|
|
|
def __om_init_new(self, str=None):
|
|
__om_init_tmp(self)
|
|
if str:
|
|
res = self.parse_options(str)
|
|
if res:
|
|
raise RuntimeError("failed to parse option at: '" + str + "'")
|
|
|
|
|
|
option_map.__init__ = __om_init_new
|
|
|
|
@_extend(aig)
|
|
class aig:
|
|
def _repr_svg_(self, opt=None):
|
|
ostr = ostringstream()
|
|
print_dot(ostr, self, opt)
|
|
return _ostream_to_svg(ostr)
|
|
|
|
def show(self, opt=None):
|
|
from spot.jupyter import SVG
|
|
return SVG(self._repr_svg_(opt))
|
|
|
|
def to_str(self, format='circuit', opt=None):
|
|
format = format.lower()
|
|
if format == 'circuit':
|
|
ostr = ostringstream()
|
|
print_aiger(ostr, self)
|
|
return ostr.str()
|
|
if format == 'dot':
|
|
ostr = ostringstream()
|
|
print_dot(ostr, self, opt)
|
|
return ostr.str()
|
|
raise ValueError("unknown string format: " + format)
|
|
|
|
def evaluate(self, gen):
|
|
self.circ_init()
|
|
outputs_pos = self.outputs()
|
|
out_names = self.output_names()
|
|
for x in gen:
|
|
if len(x) != self.num_inputs():
|
|
raise ValueError("Incorrect number of inputs")
|
|
self.circ_step(x)
|
|
values = self.circ_state()
|
|
res_val = [values[index] for index in outputs_pos]
|
|
assert(len(res_val) == len(out_names))
|
|
yield list(zip(out_names, res_val))
|
|
|
|
__twa__acc1 = twa.acc
|
|
__twa__acc2 = twa.get_acceptance
|
|
|
|
|
|
# We store the automaton into the acceptance condition
|
|
# returned by acc so that it does not crash when
|
|
# taking the reference to a temporary, as in
|
|
# spot.translate('Foo').acc()
|
|
# Issue #468.
|
|
def __twa_acc1_tmp(self):
|
|
a = __twa__acc1(self)
|
|
a._aut = self
|
|
return a
|
|
|
|
|
|
def __twa_acc2_tmp(self):
|
|
a = __twa__acc2(self)
|
|
a._aut = self
|
|
return a
|
|
|
|
|
|
twa.acc = __twa_acc1_tmp
|
|
twa.get_acceptance = __twa_acc2_tmp
|
|
|
|
|
|
@_extend(twa, ta)
|
|
class twa:
|
|
def _repr_svg_(self, opt=None):
|
|
"""Output the automaton as SVG"""
|
|
ostr = ostringstream()
|
|
if opt is None:
|
|
global _show_default
|
|
opt = _show_default
|
|
print_dot(ostr, self, opt)
|
|
return _ostream_to_svg(ostr)
|
|
|
|
def show(self, opt=None):
|
|
"""Display the automaton as SVG, in the IPython/Jupyter notebook"""
|
|
if opt is None:
|
|
global _show_default
|
|
opt = _show_default
|
|
# Load the SVG function only if we need it. This way the
|
|
# bindings can still be used outside of IPython if IPython is
|
|
# not installed.
|
|
from spot.jupyter import SVG
|
|
return SVG(self._repr_svg_(opt))
|
|
|
|
def highlight_states(self, states, color):
|
|
"""Highlight a list of states. This can be a list of
|
|
state numbers, or a list of Booleans."""
|
|
for idx, val in enumerate(states):
|
|
if type(val) is bool:
|
|
if val:
|
|
self.highlight_state(idx, color)
|
|
else:
|
|
self.highlight_state(val, color)
|
|
return self
|
|
|
|
def highlight_edges(self, edges, color):
|
|
"""Highlight a list of edges. This can be a list of
|
|
edge numbers, or a list of Booleans."""
|
|
for idx, val in enumerate(edges):
|
|
if type(val) is bool:
|
|
if val:
|
|
self.highlight_edge(idx, color)
|
|
else:
|
|
self.highlight_edge(val, color)
|
|
return self
|
|
|
|
@_extend(twa)
|
|
class twa:
|
|
def to_str(a, format='hoa', opt=None):
|
|
format = format.lower()
|
|
if format == 'hoa':
|
|
ostr = ostringstream()
|
|
print_hoa(ostr, a, opt)
|
|
return ostr.str()
|
|
if format == 'dot':
|
|
ostr = ostringstream()
|
|
print_dot(ostr, a, opt)
|
|
return ostr.str()
|
|
if format == 'spin':
|
|
ostr = ostringstream()
|
|
print_never_claim(ostr, a, opt)
|
|
return ostr.str()
|
|
if format == 'lbtt':
|
|
ostr = ostringstream()
|
|
print_lbtt(ostr, a, opt)
|
|
return ostr.str()
|
|
raise ValueError("unknown string format: " + format)
|
|
|
|
def save(a, filename, format='hoa', opt=None, append=False):
|
|
with open(filename, 'a' if append else 'w') as f:
|
|
s = a.to_str(format, opt)
|
|
f.write(s)
|
|
if s[-1] != '\n':
|
|
f.write('\n')
|
|
return a
|
|
|
|
|
|
@_extend(twa_graph)
|
|
class twa_graph:
|
|
def show_storage(self, opt=None):
|
|
ostr = ostringstream()
|
|
self.dump_storage_as_dot(ostr, opt)
|
|
from spot.jupyter import SVG
|
|
return SVG(_ostream_to_svg(ostr))
|
|
|
|
def __copy__(self):
|
|
return make_twa_graph(self, twa_prop_set.all(), True)
|
|
|
|
|
|
def make_twa_graph(*args):
|
|
from spot.impl import make_twa_graph as mtg
|
|
if len(args) == 0:
|
|
return mtg(_bdd_dict)
|
|
return mtg(*args)
|
|
|
|
|
|
@_extend(formula)
|
|
class formula:
|
|
def __init__(self, str):
|
|
"""Parse the given string to create a formula."""
|
|
if type(str) == formula:
|
|
self.this = str
|
|
else:
|
|
self.this = parse_formula(str)
|
|
|
|
def show_ast(self):
|
|
"""Display the syntax tree of the formula."""
|
|
# Load the SVG function only if we need it. This way the bindings
|
|
# can still be used outside of IPython if IPython is not
|
|
# installed.
|
|
from spot.jupyter import SVG
|
|
return SVG(_str_to_svg(self.to_str('d')))
|
|
|
|
def _repr_latex_(self):
|
|
return '$' + self.to_str('j') + '$'
|
|
|
|
def to_str(self, format='spot', parenth=False):
|
|
if format == 'spot' or format == 'f':
|
|
return str_psl(self, parenth)
|
|
elif format == 'spin' or format == 's':
|
|
return str_spin_ltl(self, parenth)
|
|
elif format == 'utf8' or format == '8':
|
|
return str_utf8_psl(self, parenth)
|
|
elif format == 'lbt' or format == 'l':
|
|
return str_lbt_ltl(self)
|
|
elif format == 'wring' or format == 'w':
|
|
return str_wring_ltl(self)
|
|
elif format == 'latex' or format == 'x':
|
|
return str_latex_psl(self, parenth)
|
|
elif format == 'sclatex' or format == 'X':
|
|
return str_sclatex_psl(self, parenth)
|
|
elif format == 'mathjax' or format == 'j':
|
|
return (str_sclatex_psl(self, parenth).
|
|
replace("``", "\\unicode{x201C}").
|
|
replace("\\textrm{''}", "\\unicode{x201D}"))
|
|
elif format == 'dot' or format == 'd':
|
|
ostr = ostringstream()
|
|
print_dot_psl(ostr, self)
|
|
return ostr.str().encode('utf-8')
|
|
else:
|
|
raise ValueError("unknown string format: " + format)
|
|
|
|
def __format__(self, spec):
|
|
"""Format the formula according to `spec`.
|
|
|
|
Parameters
|
|
----------
|
|
spec : str, optional
|
|
a list of letters that specify how the formula
|
|
should be formatted.
|
|
|
|
Supported specifiers
|
|
--------------------
|
|
|
|
- 'f': use Spot's syntax (default)
|
|
- '8': use Spot's syntax in UTF-8 mode
|
|
- 's': use Spin's syntax
|
|
- 'l': use LBT's syntax
|
|
- 'w': use Wring's syntax
|
|
- 'x': use LaTeX output
|
|
- 'X': use self-contained LaTeX output
|
|
- 'j': use self-contained LaTeX output, adjusted for MathJax
|
|
|
|
Add some of those letters for additional options:
|
|
|
|
- 'p': use full parentheses
|
|
- 'c': escape the formula for CSV output (this will
|
|
enclose the formula in double quotes, and escape
|
|
any included double quotes)
|
|
- 'h': escape the formula for HTML output
|
|
- 'd': escape double quotes and backslash,
|
|
for use in C-strings (the outermost double
|
|
quotes are *not* added)
|
|
- 'q': quote and escape for shell output, using single
|
|
quotes or double quotes depending on the contents.
|
|
- '[...]': rewrite away all the operators specified in brackets,
|
|
using spot.unabbreviate().
|
|
|
|
- ':spec': pass the remaining specification to the
|
|
formating function for strings.
|
|
|
|
"""
|
|
|
|
syntax = 'f'
|
|
parent = False
|
|
escape = None
|
|
|
|
form = self
|
|
|
|
while spec:
|
|
c, spec = spec[0], spec[1:]
|
|
if c in ('f', 's', '8', 'l', 'w', 'x', 'X', 'j'):
|
|
syntax = c
|
|
elif c == 'p':
|
|
parent = True
|
|
elif c in ('c', 'd', 'h', 'q'):
|
|
escape = c
|
|
elif c == ':':
|
|
break
|
|
elif c == '[':
|
|
pos = spec.find(']')
|
|
if pos < 0:
|
|
raise ValueError("unclosed bracket: [" + spec)
|
|
form = form.unabbreviate(spec[0:pos])
|
|
spec = spec[pos+1:]
|
|
else:
|
|
raise ValueError("unknown format specification: " + c + spec)
|
|
|
|
s = form.to_str(syntax, parent)
|
|
|
|
if escape == 'c':
|
|
o = ostringstream()
|
|
escape_rfc4180(o, s)
|
|
s = '"' + o.str() + '"'
|
|
elif escape == 'd':
|
|
s = escape_str(s)
|
|
elif escape == 'h':
|
|
o = ostringstream()
|
|
escape_html(o, s)
|
|
s = o.str()
|
|
elif escape == 'q':
|
|
o = ostringstream()
|
|
quote_shell_string(o, s)
|
|
s = o.str()
|
|
|
|
return s.__format__(spec)
|
|
|
|
def traverse(self, func, *args):
|
|
if func(self, *args):
|
|
return
|
|
for f in self:
|
|
f.traverse(func, *args)
|
|
|
|
def map(self, func, *args):
|
|
k = self.kind()
|
|
if k in (op_ff, op_tt, op_eword, op_ap):
|
|
return self
|
|
if k in (op_Not, op_X, op_F, op_G, op_Closure,
|
|
op_NegClosure, op_NegClosureMarked):
|
|
return formula.unop(k, func(self[0], *args))
|
|
if k in (op_Xor, op_Implies, op_Equiv, op_U, op_R, op_W,
|
|
op_M, op_EConcat, op_EConcatMarked, op_UConcat):
|
|
return formula.binop(k, func(self[0], *args), func(self[1], *args))
|
|
if k in (op_Or, op_OrRat, op_And, op_AndRat, op_AndNLM,
|
|
op_Concat, op_Fusion):
|
|
return formula.multop(k, [func(x, *args) for x in self])
|
|
if k in (op_Star, op_FStar):
|
|
return formula.bunop(k, func(self[0], *args),
|
|
self.min(), self.max())
|
|
raise ValueError("unknown type of formula")
|
|
|
|
|
|
@_extend(atomic_prop_set)
|
|
class atomic_prop_set:
|
|
def _repr_latex_(self):
|
|
res = '$\{'
|
|
comma = ''
|
|
for ap in self:
|
|
apname = ap.to_str('j')
|
|
if not '\\unicode{' in apname:
|
|
apname = "\\unicode{x201C}" + apname + "\\unicode{x201D}"
|
|
res += comma
|
|
comma = ', '
|
|
res += apname
|
|
res += '\}$'
|
|
return res
|
|
|
|
|
|
@_extend(zielonka_tree)
|
|
class zielonka_tree:
|
|
def _repr_svg_(self):
|
|
"""Output the Zielonka tree as SVG"""
|
|
ostr = ostringstream()
|
|
self.dot(ostr)
|
|
return _ostream_to_svg(ostr)
|
|
|
|
_acdnum = 0
|
|
|
|
@_extend(acd)
|
|
class acd:
|
|
def _repr_svg_(self, id=None):
|
|
"""Output the ACD as SVG"""
|
|
ostr = ostringstream()
|
|
self.dot(ostr, id)
|
|
return _ostream_to_svg(ostr)
|
|
|
|
def _repr_html_(self):
|
|
global _acdnum
|
|
num = _acdnum
|
|
_acdnum += 1
|
|
style = '''
|
|
.acdhigh ellipse,.acdacc ellipse,.acdacc path,.acdacc polygon{stroke:green;}
|
|
.acdhigh polygon,.acdrej ellipse,.acdrej path,.acdrej polygon{stroke:red;}
|
|
.acdbold ellipse,.acdbold polygon,.acdbold path{stroke-width:2;}
|
|
.acdrej polygon{fill:red;}
|
|
.acdacc polygon{fill:green;}
|
|
'''
|
|
js = '''
|
|
function acd{num}_clear(){{
|
|
$("#acd{num} .node,#acdaut{num} .node,#acdaut{num} .edge")
|
|
.removeClass("acdhigh acdbold acdacc acdrej");
|
|
}};
|
|
function acd{num}_state(state){{
|
|
acd{num}_clear();
|
|
$("#acd{num} .acdS" + state).addClass("acdhigh acdbold");
|
|
$("#acdaut{num} #S" + state).addClass("acdbold");
|
|
}};
|
|
function acd{num}_edge(edge){{
|
|
acd{num}_clear();
|
|
var theedge = $('#acdaut{num} #E' + edge)
|
|
var classList = theedge.attr('class').split(/\s+/);
|
|
$.each(classList, function(index, item) {{
|
|
if (item.startsWith('acdN')) {{
|
|
$("#acd{num} #" + item.substring(3)).addClass("acdhigh acdbold");
|
|
}}
|
|
}});
|
|
theedge.addClass("acdbold");
|
|
}};
|
|
function acd{num}_node(node, acc){{
|
|
acd{num}_clear();
|
|
$("#acdaut{num} .acdN" + node).addClass(acc
|
|
? "acdacc acdbold"
|
|
: "acdrej acdbold");
|
|
$("#acd{num} #N" + node).addClass("acdbold acdhigh");
|
|
}};'''.format(num=num)
|
|
me = 0
|
|
for n in range(self.node_count()):
|
|
for e in self.edges_of_node(n):
|
|
me = max(e, me)
|
|
js += '$("#acdaut{num} #E{e}").addClass("acdN{n}");'\
|
|
.format(num=num, e=e, n=n)
|
|
for e in range(1, me + 1):
|
|
js += '$("#acdaut{num} #E{e}")'\
|
|
'.click(function(){{acd{num}_edge({e});}});'\
|
|
.format(num=num, e=e)
|
|
for s in range(self.get_aut().num_states()):
|
|
js += '$("#acdaut{num} #S{s}")'\
|
|
'.click(function(){{acd{num}_state({s});}});'\
|
|
.format(num=num, s=s)
|
|
for n in range(self.node_count()):
|
|
v = int(self.node_acceptance(n))
|
|
js += '$("#acd{num} #N{n}")'\
|
|
'.click(function(){{acd{num}_node({n}, {v});}});'\
|
|
.format(num=num, n=n, v=v)
|
|
html = '<style>{}</style><div>{}</div><div>{}</div><script>{}</script>'\
|
|
.format(style,
|
|
self.get_aut().show('.i(acdaut{})'.format(num)).data,
|
|
self._repr_svg_("acd{}".format(num)),
|
|
js);
|
|
return html
|
|
|
|
|
|
def automata(*sources, timeout=None, ignore_abort=True,
|
|
trust_hoa=True, no_sid=False, debug=False,
|
|
want_kripke=False):
|
|
"""Read automata from a list of sources.
|
|
|
|
Parameters
|
|
----------
|
|
*sources : list of str
|
|
These sources can be either commands (end with `|`),
|
|
textual representations of automata (contain `\n`),
|
|
or filenames (else).
|
|
timeout : int, optional
|
|
Number of seconds to wait for the result of a command.
|
|
If None (the default), not limit is used.
|
|
ignore_abort : bool, optional
|
|
If True (the default), skip HOA atomata that ends with
|
|
`--ABORT--`, and return the next automaton in the stream.
|
|
If False, aborted automata are reported as syntax errors.
|
|
trust_hoa : bool, optional
|
|
If True (the default), supported HOA properies that
|
|
cannot be easily verified are trusted.
|
|
want_kripke : bool, optional
|
|
If True, the input is expected to discribe Kripke
|
|
structures, in the HOA format, and the returned type
|
|
will be of type kripke_graph_ptr.
|
|
no_sid : bool, optional
|
|
When an automaton is obtained from a subprocess, this
|
|
subprocess is started from a shell with its own session
|
|
group (the default) unless no_sid is set to True.
|
|
debug : bool, optional
|
|
Whether to run the parser in debug mode.
|
|
|
|
Notes
|
|
-----
|
|
|
|
The automata can be written in the `HOA format`_, as `never
|
|
claims`_, in `LBTT's format`_, or in `ltl2dstar's format`_.
|
|
|
|
.. _HOA format: http://adl.github.io/hoaf/
|
|
.. _never claims: http://spinroot.com/spin/Man/never.html
|
|
.. _LBTT's format:
|
|
http://www.tcs.hut.fi/Software/lbtt/doc/html/Format-for-automata.html
|
|
.. _ltl2dstar's format:
|
|
http://www.ltl2dstar.de/docs/ltl2dstar.html#output-format-dstar
|
|
|
|
If an argument ends with a `|`, then this argument is interpreted as
|
|
a shell command, and the output of that command (without the `|`)
|
|
is parsed.
|
|
|
|
If an argument contains a newline, then it is interpreted as
|
|
actual contents to be parsed.
|
|
|
|
Otherwise, the argument is assumed to be a filename.
|
|
|
|
The result of this function is a generator on all the automata
|
|
objects read from these sources. The typical usage is::
|
|
|
|
for aut in spot.automata(filename, command, ...):
|
|
# do something with aut
|
|
|
|
When the source is a command, and no `timeout` is specified,
|
|
parsing is done straight out of the pipe connecting the
|
|
command. So
|
|
|
|
for aut in spot.automata('randaut -H -n 10 2 |'):
|
|
process(aut)
|
|
|
|
will call `process(aut)` on each automaton as soon as it is output by
|
|
`randaut`, and without waiting for `randaut` to terminate.
|
|
|
|
However if `timeout` is passed, then `automata()` will wait for
|
|
the entire command to terminate before parsing its entire output.
|
|
If one command takes more than `timeout` seconds,
|
|
`subprocess.TimeoutExpired` is raised.
|
|
|
|
If any command terminates with a non-zero error,
|
|
`subprocess.CalledProcessError` is raised.
|
|
"""
|
|
|
|
o = automaton_parser_options()
|
|
o.debug = debug
|
|
o.ignore_abort = ignore_abort
|
|
o.trust_hoa = trust_hoa
|
|
o.raise_errors = True
|
|
o.want_kripke = want_kripke
|
|
|
|
for filename in sources:
|
|
try:
|
|
p = None
|
|
proc = None
|
|
if filename[-1] == '|':
|
|
setsid_maybe = None
|
|
if not no_sid:
|
|
setsid_maybe = os.setsid
|
|
# universal_newlines for str output instead of bytes
|
|
# when the pipe is read from Python (which happens
|
|
# when timeout is set).
|
|
prefn = None if no_sid else os.setsid
|
|
proc = subprocess.Popen(filename[:-1], shell=True,
|
|
preexec_fn=prefn,
|
|
universal_newlines=True,
|
|
stdout=subprocess.PIPE)
|
|
if timeout is None:
|
|
p = automaton_stream_parser(proc.stdout.fileno(),
|
|
filename, o)
|
|
else:
|
|
try:
|
|
out, err = proc.communicate(timeout=timeout)
|
|
except subprocess.TimeoutExpired:
|
|
# Using subprocess.check_output() with timeout
|
|
# would just kill the shell, not its children.
|
|
os.killpg(proc.pid, signal.SIGKILL)
|
|
raise
|
|
else:
|
|
ret = proc.wait()
|
|
if ret:
|
|
raise subprocess.CalledProcessError(ret,
|
|
filename[:-1])
|
|
finally:
|
|
proc = None
|
|
p = automaton_stream_parser(out, filename, o)
|
|
elif '\n' in filename:
|
|
p = automaton_stream_parser(filename, "<string>", o)
|
|
else:
|
|
p = automaton_stream_parser(filename, o)
|
|
a = True
|
|
# Using proc as a context manager ensures that proc.stdout will be
|
|
# closed on exit, and the process will be properly waited for.
|
|
# This is important when running tools that produce an infinite
|
|
# stream of automata and that must be killed once the generator
|
|
# returned by spot.automata() is destroyed. Otherwise, _supress()
|
|
# is just a dummy context manager that does nothing (Python 3.7
|
|
# introduces nullcontext() for this purpose, but at the time of
|
|
# writing we support Python 3.4).
|
|
mgr = proc if proc else _supress()
|
|
with mgr:
|
|
while a:
|
|
# the automaton is None when we reach the end of the file.
|
|
res = p.parse(_bdd_dict)
|
|
a = res.ks if want_kripke else res.aut
|
|
if a:
|
|
yield a
|
|
finally:
|
|
# Make sure we destroy the parser (p) and the subprocess
|
|
# (prop) in the correct order.
|
|
del p
|
|
if proc is not None:
|
|
ret = proc.returncode
|
|
del proc
|
|
# Do not complain about the exit code if we are already raising
|
|
# an exception.
|
|
if ret and sys.exc_info()[0] is None:
|
|
raise subprocess.CalledProcessError(ret, filename[:-1])
|
|
# deleting o explicitly now prevents Python 3.5 from
|
|
# reporting the following error: "<built-in function
|
|
# delete_automaton_parser_options> returned a result with
|
|
# an error set". It's not clear to me if the bug is in Python
|
|
# or Swig. At least it's related to the use of generators.
|
|
del o
|
|
return
|
|
|
|
|
|
def automaton(filename, **kwargs):
|
|
"""Read a single automaton from a file.
|
|
|
|
See `spot.automata` for a list of supported formats."""
|
|
try:
|
|
return next(automata(filename, **kwargs))
|
|
except StopIteration:
|
|
raise RuntimeError("Failed to read automaton from {}".format(filename))
|
|
|
|
def aiger_circuits(*sources, bdd_dict = None):
|
|
"""Read aiger circuits from a list of sources.
|
|
|
|
Parameters
|
|
----------
|
|
*sources : list of str
|
|
These sources can be either commands (end with `|`),
|
|
textual representations of the circuit in restricted aag format
|
|
(start with aag and contain '\n'), or filenames (else).
|
|
bdd_dict : If not supplied, a fresh bdd_dict will be created, common for
|
|
all of the circuits.
|
|
"""
|
|
|
|
bdd_dict = bdd_dict if bdd_dict is not None else make_bdd_dict()
|
|
|
|
for source in sources:
|
|
if (source.startswith("aag") and '\n' in source):
|
|
yield aig.parse_aag(source, "<string>", bdd_dict)
|
|
else:
|
|
yield aig.parse_aag(source, bdd_dict)
|
|
|
|
def aiger_circuit(source, bdd_dict = None):
|
|
"""Read a single aiger circuit from file or textual representation.
|
|
|
|
See `spot.aiger_circuits` for a list of supported formats."""
|
|
try:
|
|
return next(aiger_circuits(source, bdd_dict = bdd_dict))
|
|
except StopIteration:
|
|
raise RuntimeError("Failed to read an aiger circuit "
|
|
"from {}".format(source))
|
|
|
|
|
|
def _postproc_translate_options(obj, default_type, *args):
|
|
type_name_ = None
|
|
type_ = None
|
|
pref_name_ = None
|
|
pref_ = None
|
|
optm_name_ = None
|
|
optm_ = None
|
|
comp_ = 0
|
|
unam_ = 0
|
|
sbac_ = 0
|
|
colo_ = 0
|
|
|
|
def type_set(val):
|
|
nonlocal type_, type_name_
|
|
if type_ is not None and type_name_ != val:
|
|
raise ValueError("type cannot be both {} and {}"
|
|
.format(type_name_, val))
|
|
elif val == 'generic' or val == 'gen' or val == 'g':
|
|
type_ = postprocessor.Generic
|
|
elif val == 'tgba': # historical
|
|
type_ = postprocessor.GeneralizedBuchi
|
|
elif val == 'generalizedbuchi':
|
|
type_ = postprocessor.GeneralizedBuchi
|
|
elif val == 'ba': # historical
|
|
type_ = postprocessor.Buchi
|
|
sbac_ = postprocessor.SBAcc
|
|
elif val == 'buchi':
|
|
type_ = postprocessor.Buchi
|
|
elif val == 'cobuchi' or val == 'nca':
|
|
type_ = postprocessor.CoBuchi
|
|
elif val == 'dca':
|
|
type_ = postprocessor.CoBuchi
|
|
pref_ = postprocessor.Deterministic
|
|
elif val == 'parity min odd':
|
|
type_ = postprocessor.ParityMinOdd
|
|
elif val == 'parity min even':
|
|
type_ = postprocessor.ParityMinEven
|
|
elif val == 'parity max odd':
|
|
type_ = postprocessor.ParityMaxOdd
|
|
elif val == 'parity max even':
|
|
type_ = postprocessor.ParityMaxEven
|
|
elif val == 'parity min':
|
|
type_ = postprocessor.ParityMin
|
|
elif val == 'parity max':
|
|
type_ = postprocessor.ParityMax
|
|
elif val == 'parity odd':
|
|
type_ = postprocessor.ParityOdd
|
|
elif val == 'parity even':
|
|
type_ = postprocessor.ParityEven
|
|
elif val == 'parity':
|
|
type_ = postprocessor.Parity
|
|
else:
|
|
assert(val == 'monitor')
|
|
type_ = postprocessor.Monitor
|
|
type_name_ = val
|
|
|
|
def pref_set(val):
|
|
nonlocal pref_, pref_name_
|
|
if pref_ is not None and pref_name_ != val:
|
|
raise ValueError("preference cannot be both {} and {}"
|
|
.format(pref_name_, val))
|
|
elif val == 'small':
|
|
pref_ = postprocessor.Small
|
|
elif val == 'deterministic':
|
|
pref_ = postprocessor.Deterministic
|
|
else:
|
|
assert(val == 'any')
|
|
pref_ = postprocessor.Any
|
|
pref_name_ = val
|
|
|
|
def optm_set(val):
|
|
nonlocal optm_, optm_name_
|
|
if optm_ is not None and optm_name_ != val:
|
|
raise ValueError("optimization level cannot be both {} and {}"
|
|
.format(optm_name_, val))
|
|
if val == 'high':
|
|
optm_ = postprocessor.High
|
|
elif val.startswith('med'):
|
|
optm_ = postprocessor.Medium
|
|
else:
|
|
assert(val == 'low')
|
|
optm_ = postprocessor.Low
|
|
optm_name_ = val
|
|
|
|
def misc_set(val):
|
|
nonlocal comp_, unam_, sbac_, colo_
|
|
if val == 'colored':
|
|
colo_ = postprocessor.Colored
|
|
elif val == 'complete':
|
|
comp_ = postprocessor.Complete
|
|
elif (val == 'sbacc' or val == 'statebasedacceptance'
|
|
or val == 'state-based-acceptance'):
|
|
sbac_ = postprocessor.SBAcc
|
|
else:
|
|
assert(val == 'unambiguous')
|
|
unam_ = postprocessor.Unambiguous
|
|
|
|
options = {
|
|
'any': pref_set,
|
|
'ba': type_set,
|
|
'buchi': type_set,
|
|
'cobuchi': type_set,
|
|
'colored': misc_set,
|
|
'complete': misc_set,
|
|
'dca': type_set,
|
|
'deterministic': pref_set,
|
|
'g': type_set,
|
|
'gen': type_set,
|
|
'generic': type_set,
|
|
'generalizedbuchi': type_set,
|
|
'high': optm_set,
|
|
'low': optm_set,
|
|
'medium': optm_set,
|
|
'monitor': type_set,
|
|
'nca': type_set,
|
|
'parity even': type_set,
|
|
'parity max even': type_set,
|
|
'parity max odd': type_set,
|
|
'parity max': type_set,
|
|
'parity min even': type_set,
|
|
'parity min odd': type_set,
|
|
'parity min': type_set,
|
|
'parity odd': type_set,
|
|
'parity': type_set,
|
|
'sbacc': misc_set,
|
|
'small': pref_set,
|
|
'statebasedacceptance': misc_set,
|
|
'state-based-acceptance': misc_set,
|
|
'tgba': type_set,
|
|
'unambiguous': misc_set,
|
|
}
|
|
|
|
for arg in args:
|
|
arg = arg.lower()
|
|
fn = options.get(arg)
|
|
if fn:
|
|
fn(arg)
|
|
else:
|
|
# arg is not an know option, but maybe it is a prefix of
|
|
# one of them
|
|
compat = []
|
|
f = None
|
|
for key, fn in options.items():
|
|
if key.startswith(arg):
|
|
compat.append(key)
|
|
f = fn
|
|
lc = len(compat)
|
|
if lc == 1:
|
|
f(compat[0])
|
|
elif lc < 1:
|
|
raise ValueError("unknown option '{}'".format(arg))
|
|
else:
|
|
raise ValueError("ambiguous option '{}' is prefix of {}"
|
|
.format(arg, str(compat)))
|
|
|
|
if type_ is None:
|
|
type_ = default_type
|
|
if pref_ is None:
|
|
pref_ = postprocessor.Small
|
|
if optm_ is None:
|
|
optm_ = postprocessor.High
|
|
|
|
obj.set_type(type_)
|
|
obj.set_pref(pref_ | comp_ | unam_ | sbac_ | colo_)
|
|
obj.set_level(optm_)
|
|
|
|
|
|
def translate(formula, *args, dict=_bdd_dict, xargs=None):
|
|
"""Translate a formula into an automaton.
|
|
|
|
Keep in mind that 'Deterministic' expresses just a preference that
|
|
may not be satisfied.
|
|
|
|
The optional arguments should be strings among the following:
|
|
- at most one in 'GeneralizedBuchi', 'Buchi', or 'Monitor',
|
|
'generic', 'parity', 'parity min odd', 'parity min even',
|
|
'parity max odd', 'parity max even', 'coBuchi'
|
|
(type of acceptance condition to build)
|
|
- at most one in 'Small', 'Deterministic', 'Any'
|
|
(preferred characteristics of the produced automaton)
|
|
- at most one in 'Low', 'Medium', 'High'
|
|
(optimization level)
|
|
- any combination of 'Complete', 'Unambiguous',
|
|
'StateBasedAcceptance' (or 'SBAcc' for short), and
|
|
'Colored' (only for parity acceptance)
|
|
|
|
The default corresponds to 'tgba', 'small' and 'high'.
|
|
|
|
Additional options can be supplied using a `spot.option_map`, or a
|
|
string (that will be converted to `spot.option_map`), as the `xargs`
|
|
argument. This is similar to the `-x` option of command-line tools;
|
|
so check out the spot-x(7) man page for details.
|
|
|
|
"""
|
|
if type(xargs) is str:
|
|
xargs = option_map(xargs)
|
|
a = translator(dict, xargs)
|
|
_postproc_translate_options(a, postprocessor.TGBA, *args)
|
|
if type(formula) == str:
|
|
formula = parse_formula(formula)
|
|
result = a.run(formula)
|
|
if xargs:
|
|
xargs.report_unused_options()
|
|
return result
|
|
|
|
|
|
formula.translate = translate
|
|
|
|
|
|
# Wrap C++-functions into lambdas so that they get converted into
|
|
# instance methods (i.e., self passed as first argument
|
|
# automatically), because only user-defined functions are converted as
|
|
# instance methods.
|
|
def _add_formula(meth, name=None):
|
|
setattr(formula, name or meth, (lambda self, *args, **kwargs:
|
|
globals()[meth](self, *args, **kwargs)))
|
|
|
|
|
|
_add_formula('contains')
|
|
_add_formula('are_equivalent', 'equivalent_to')
|
|
|
|
|
|
def postprocess(automaton, *args, formula=None, xargs=None):
|
|
"""Post process an automaton.
|
|
|
|
This applies a number of simplification algorithms, depending on
|
|
the options supplied. Keep in mind that 'Deterministic' expresses
|
|
just a preference that may not be satisfied if the input is
|
|
not already 'Deterministic'.
|
|
|
|
The optional arguments should be strings among the following:
|
|
- at most one in 'Generic', 'GeneralizedBuchi', 'Buchi', or
|
|
'Monitor', 'parity', 'parity min odd', 'parity min even',
|
|
'parity max odd', 'parity max even', 'coBuchi'
|
|
(type of acceptance condition to build)
|
|
- at most one in 'Small', 'Deterministic', 'Any'
|
|
(preferred characteristics of the produced automaton)
|
|
- at most one in 'Low', 'Medium', 'High'
|
|
(optimization level)
|
|
- any combination of 'Complete', 'StateBasedAcceptance'
|
|
(or 'SBAcc' for short), and 'Colored (only for parity
|
|
acceptance)
|
|
|
|
The default corresponds to 'generic', 'small' and 'high'.
|
|
|
|
If a formula denoted by this automaton is known, pass it to as the
|
|
optional `formula` argument; it can help some algorithms by
|
|
providing an easy way to complement the automaton.
|
|
|
|
Additional options can be supplied using a `spot.option_map`, or a
|
|
string (that will be converted to `spot.option_map`), as the `xargs`
|
|
argument. This is similar to the `-x` option of command-line tools;
|
|
so check out the spot-x(7) man page for details.
|
|
|
|
"""
|
|
if type(xargs) is str:
|
|
xargs = option_map(xargs)
|
|
p = postprocessor(xargs)
|
|
if type(automaton) == str:
|
|
automaton = globals()['automaton'](automaton)
|
|
_postproc_translate_options(p, postprocessor.Generic, *args)
|
|
result = p.run(automaton, formula)
|
|
if xargs:
|
|
xargs.report_unused_options()
|
|
return result
|
|
|
|
|
|
twa.postprocess = postprocess
|
|
|
|
|
|
# Wrap C++-functions into lambdas so that they get converted into
|
|
# instance methods (i.e., self passed as first argument
|
|
# automatically), because only user-defined functions are converted as
|
|
# instance methods.
|
|
def _add_twa_graph(meth, name=None):
|
|
setattr(twa_graph, name or meth, (lambda self, *args, **kwargs:
|
|
globals()[meth](self, *args, **kwargs)))
|
|
|
|
|
|
for meth in ('scc_filter', 'scc_filter_states',
|
|
'is_deterministic', 'is_unambiguous',
|
|
'contains', 'get_strategy',
|
|
'set_state_players', 'get_state_players',
|
|
'set_state_player', 'get_state_player',
|
|
'get_state_winners', 'get_state_winner'):
|
|
_add_twa_graph(meth)
|
|
_add_twa_graph('are_equivalent', 'equivalent_to')
|
|
|
|
# Wrapper around a formula iterator to which we add some methods of formula
|
|
# (using _addfilter and _addmap), so that we can write things like
|
|
# formulas.simplify().is_X_free().
|
|
|
|
|
|
class formulaiterator:
|
|
def __init__(self, formulas):
|
|
self._formulas = formulas
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
def __next__(self):
|
|
return next(self._formulas)
|
|
|
|
|
|
# fun shoud be a predicate and should be a method of formula.
|
|
# _addfilter adds this predicate as a filter whith the same name in
|
|
# formulaiterator.
|
|
def _addfilter(fun):
|
|
def filtf(self, *args, **kwargs):
|
|
it = filter(lambda f: getattr(f, fun)(*args, **kwargs), self)
|
|
return formulaiterator(it)
|
|
|
|
def nfiltf(self, *args, **kwargs):
|
|
it = filter(lambda f: not getattr(f, fun)(*args, **kwargs), self)
|
|
return formulaiterator(it)
|
|
|
|
if fun[:3] == 'is_':
|
|
notfun = 'is_not_' + fun[3:]
|
|
elif fun[:4] == 'has_':
|
|
notfun = 'has_no_' + fun[4:]
|
|
else:
|
|
notfun = 'not_' + fun
|
|
setattr(formulaiterator, fun, filtf)
|
|
setattr(formulaiterator, notfun, nfiltf)
|
|
|
|
|
|
# fun should be a function taking a formula as its first parameter and
|
|
# returning a formula. _addmap adds this function as a method of
|
|
# formula and formulaiterator.
|
|
def _addmap(fun):
|
|
def mapf(self, *args, **kwargs):
|
|
return formulaiterator(map(lambda f: getattr(f, fun)(*args, **kwargs),
|
|
self))
|
|
setattr(formula, fun,
|
|
lambda self, *args, **kwargs:
|
|
globals()[fun](self, *args, **kwargs))
|
|
setattr(formulaiterator, fun, mapf)
|
|
|
|
|
|
def randltl(ap, n=-1, **kwargs):
|
|
"""Generate random formulas.
|
|
|
|
Returns a random formula iterator.
|
|
|
|
ap: the number of atomic propositions used to generate random formulas.
|
|
|
|
n: number of formulas to generate, or unbounded if n < 0.
|
|
|
|
**kwargs:
|
|
seed: seed for the random number generator (0).
|
|
output: can be 'ltl', 'psl', 'bool' or 'sere' ('ltl').
|
|
allow_dups: allow duplicate formulas (False).
|
|
tree_size: tree size of the formulas generated, before mandatory
|
|
simplifications (15)
|
|
boolean_priorities: set priorities for Boolean formulas.
|
|
ltl_priorities: set priorities for LTL formulas.
|
|
sere_priorities: set priorities for SERE formulas.
|
|
dump_priorities: show current priorities, do not generate any formula.
|
|
simplify:
|
|
0 No rewriting
|
|
1 basic rewritings and eventual/universal rules
|
|
2 additional syntactic implication rules
|
|
3 (default) better implications using containment
|
|
"""
|
|
opts = option_map()
|
|
output_map = {
|
|
"ltl": randltlgenerator.LTL,
|
|
"psl": randltlgenerator.PSL,
|
|
"bool": randltlgenerator.Bool,
|
|
"sere": randltlgenerator.SERE
|
|
}
|
|
|
|
if isinstance(ap, list):
|
|
aprops = atomic_prop_set()
|
|
for elt in ap:
|
|
aprops.insert(formula.ap(elt))
|
|
ap = aprops
|
|
ltl_priorities = kwargs.get("ltl_priorities", None)
|
|
sere_priorities = kwargs.get("sere_priorities", None)
|
|
boolean_priorities = kwargs.get("boolean_priorities", None)
|
|
output = output_map[kwargs.get("output", "ltl")]
|
|
opts.set("output", output)
|
|
opts.set("seed", kwargs.get("seed", 0))
|
|
tree_size = kwargs.get("tree_size", 15)
|
|
if isinstance(tree_size, tuple):
|
|
tree_size_min, tree_size_max = tree_size
|
|
else:
|
|
tree_size_min = tree_size_max = tree_size
|
|
opts.set("tree_size_min", tree_size_min)
|
|
opts.set("tree_size_max", tree_size_max)
|
|
opts.set("unique", not kwargs.get("allow_dups", False))
|
|
opts.set("wf", kwargs.get("weak_fairness", False))
|
|
simpl_level = kwargs.get("simplify", 0)
|
|
if simpl_level > 3 or simpl_level < 0:
|
|
sys.stderr.write('invalid simplification level: ' + simpl_level)
|
|
return
|
|
opts.set("simplification_level", simpl_level)
|
|
|
|
rg = randltlgenerator(ap, opts, ltl_priorities, sere_priorities,
|
|
boolean_priorities)
|
|
|
|
dump_priorities = kwargs.get("dump_priorities", False)
|
|
if dump_priorities:
|
|
dumpstream = ostringstream()
|
|
if output == randltlgenerator.LTL:
|
|
print('Use argument ltl_priorities=STRING to set the following '
|
|
'LTL priorities:\n')
|
|
rg.dump_ltl_priorities(dumpstream)
|
|
print(dumpstream.str())
|
|
elif output == randltlgenerator.Bool:
|
|
print('Use argument boolean_priorities=STRING to set the '
|
|
'following Boolean formula priorities:\n')
|
|
rg.dump_bool_priorities(dumpstream)
|
|
print(dumpstream.str())
|
|
elif output == randltlgenerator.PSL or output == randltlgenerator.SERE:
|
|
if output != randltlgenerator.SERE:
|
|
print('Use argument ltl_priorities=STRING to set the '
|
|
'following LTL priorities:\n')
|
|
rg.dump_psl_priorities(dumpstream)
|
|
print(dumpstream.str())
|
|
print('Use argument sere_priorities=STRING to set the '
|
|
'following SERE priorities:\n')
|
|
rg.dump_sere_priorities(dumpstream)
|
|
print(dumpstream.str())
|
|
print('Use argument boolean_priorities=STRING to set the '
|
|
'following Boolean formula priorities:\n')
|
|
rg.dump_sere_bool_priorities(dumpstream)
|
|
print(dumpstream.str())
|
|
else:
|
|
sys.stderr.write("internal error: unknown type of output")
|
|
return
|
|
|
|
class _randltliterator:
|
|
def __init__(self, rg, n):
|
|
self.rg = rg
|
|
self.i = 0
|
|
self.n = n
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
def __next__(self):
|
|
if self.i == self.n:
|
|
raise StopIteration
|
|
f = self.rg.next()
|
|
if f is None:
|
|
sys.stderr.write("Warning: could not generate a new "
|
|
"unique formula after {} trials.\n"
|
|
.format(randltlgenerator.MAX_TRIALS))
|
|
raise StopIteration
|
|
self.i += 1
|
|
return f
|
|
|
|
return formulaiterator(_randltliterator(rg, n))
|
|
|
|
|
|
def simplify(f, **kwargs):
|
|
level = kwargs.get('level', None)
|
|
if level is not None:
|
|
return tl_simplifier(tl_simplifier_options(level)).simplify(f)
|
|
|
|
basics = kwargs.get('basics', True)
|
|
synt_impl = kwargs.get('synt_impl', True)
|
|
event_univ = kwargs.get('event_univ', True)
|
|
cont_checks = kwargs.get('containment_checks', False)
|
|
cont_checks_stronger = kwargs.get('containment_checks_stronger', False)
|
|
nenoform_stop_on_boolean = kwargs.get('nenoform_stop_on_boolean', False)
|
|
reduce_size_strictly = kwargs.get('reduce_size_strictly', False)
|
|
boolean_to_isop = kwargs.get('boolean_to_isop', False)
|
|
favor_event_univ = kwargs.get('favor_event_univ', False)
|
|
|
|
simp_opts = tl_simplifier_options(basics,
|
|
synt_impl,
|
|
event_univ,
|
|
cont_checks,
|
|
cont_checks_stronger,
|
|
nenoform_stop_on_boolean,
|
|
reduce_size_strictly,
|
|
boolean_to_isop,
|
|
favor_event_univ)
|
|
return tl_simplifier(simp_opts).simplify(f)
|
|
|
|
|
|
for fun in dir(formula):
|
|
if (callable(getattr(formula, fun)) and (fun[:3] == 'is_' or
|
|
fun[:4] == 'has_')):
|
|
_addfilter(fun)
|
|
|
|
for fun in ['remove_x', 'relabel', 'relabel_bse',
|
|
'simplify', 'unabbreviate', 'negative_normal_form',
|
|
'mp_class', 'nesting_depth']:
|
|
_addmap(fun)
|
|
|
|
|
|
# Better interface to the corresponding C++ function.
|
|
def sat_minimize(aut, acc=None, colored=False,
|
|
state_based=False, states=0,
|
|
max_states=0, sat_naive=False, sat_langmap=False,
|
|
sat_incr=0, sat_incr_steps=0,
|
|
display_log=False, return_log=False):
|
|
args = ''
|
|
if acc is not None:
|
|
if type(acc) is not str:
|
|
raise ValueError("argument 'acc' should be a string")
|
|
args += ',acc="' + acc + '"'
|
|
if colored:
|
|
args += ',colored'
|
|
if states:
|
|
if type(states) is not int or states < 0:
|
|
raise ValueError("argument 'states' should be a positive integer")
|
|
args += ',states=' + str(states)
|
|
if max_states:
|
|
if type(max_states) is not int or max_states < 0:
|
|
raise ValueError("argument 'states' should be a positive integer")
|
|
args += ',max-states=' + str(max_states)
|
|
if sat_naive:
|
|
args += ',sat-naive'
|
|
if sat_langmap:
|
|
args += ',sat-langmap'
|
|
if sat_incr:
|
|
args += ',sat-incr=' + str(sat_incr)
|
|
args += ',sat-incr-steps=' + str(sat_incr_steps)
|
|
from spot.impl import sat_minimize as sm
|
|
|
|
if display_log or return_log:
|
|
import pandas as pd
|
|
with tempfile.NamedTemporaryFile(dir='.', suffix='.satlog') as t:
|
|
args += ',log="{}"'.format(t.name)
|
|
aut = sm(aut, args, state_based)
|
|
dfrm = pd.read_csv(t.name, dtype=object)
|
|
if display_log:
|
|
# old versions of ipython do not import display by default
|
|
from IPython.display import display
|
|
del dfrm['automaton']
|
|
display(dfrm)
|
|
if return_log:
|
|
return aut, dfrm
|
|
else:
|
|
return aut
|
|
else:
|
|
return sm(aut, args, state_based)
|
|
|
|
|
|
def parse_word(word, dic=_bdd_dict):
|
|
from spot.impl import parse_word as pw
|
|
return pw(word, dic)
|
|
|
|
|
|
def bdd_to_formula(b, dic=_bdd_dict):
|
|
from spot.impl import bdd_to_formula as bf
|
|
return bf(b, dic)
|
|
|
|
|
|
def language_containment_checker(dic=_bdd_dict):
|
|
from spot.impl import language_containment_checker as c
|
|
c.contains = lambda this, a, b: c.contained(this, b, a)
|
|
c.are_equivalent = lambda this, a, b: c.equal(this, a, b)
|
|
return c(dic)
|
|
|
|
|
|
def mp_hierarchy_svg(cl=None):
|
|
"""
|
|
Return an some string containing an SVG picture of the Manna &
|
|
Pnueli hierarchy, highlighting class `cl` if given.
|
|
|
|
If not None, `cl` should be one of 'TPROGSB'. For convenience,
|
|
if `cl` is an instance of `spot.formula`, it is replaced by
|
|
`mp_class(cl)`.
|
|
|
|
"""
|
|
if type(cl) == formula:
|
|
cl = mp_class(cl)
|
|
ch = None
|
|
coords = {
|
|
'T': '110,35',
|
|
'R': '40,80',
|
|
'P': '175,80',
|
|
'O': '110,140',
|
|
'S': '40,160',
|
|
'G': '175,160',
|
|
'B': '110,198',
|
|
}
|
|
if cl in coords:
|
|
highlight = '''<g transform="translate({})">
|
|
<line x1="-10" y1="-10" x2="10" y2="10" stroke="red" stroke-width="5" />
|
|
<line x1="-10" y1="10" x2="10" y2="-10" stroke="red" stroke-width="5" />
|
|
</g>'''.format(coords[cl])
|
|
else:
|
|
highlight = ''
|
|
return '''
|
|
<svg height="210" width="220" xmlns="http://www.w3.org/2000/svg" version="1.1">
|
|
<polygon points="20,0 200,120 200,210 20,210" fill="cyan" opacity=".2" />
|
|
<polygon points="20,120 155,210 20,210" fill="cyan" opacity=".2" />
|
|
<polygon points="200,0 20,120 20,210 200,210" fill="magenta" opacity=".15" />
|
|
<polygon points="200,120 65,210 200,210" fill="magenta" opacity=".15" />
|
|
''' + highlight + '''
|
|
<g text-anchor="middle" font-size="14">
|
|
<text x="110" y="20">Reactivity</text>
|
|
<text x="60" y="65">Recurrence</text>
|
|
<text x="160" y="65">Persistence</text>
|
|
<text x="110" y="125">Obligation</text>
|
|
<text x="60" y="185">Safety</text>
|
|
<text x="160" y="185">Guarantee</text>
|
|
</g>
|
|
<g font-size="14">
|
|
<text text-anchor="begin" transform="rotate(-90,18,210)" x="18" y="210" fill="gray">Monitor</text>
|
|
<text text-anchor="end" transform="rotate(-90,18,0)" x="18" y="0" fill="gray">Deterministic Büchi</text>
|
|
<text text-anchor="begin" transform="rotate(-90,214,210)" x="214" y="210" fill="gray">Terminal Büchi</text>
|
|
<text text-anchor="end" transform="rotate(-90,214,0)" x="214" y="0" fill="gray">Weak Büchi</text>
|
|
</g>
|
|
</svg>'''
|
|
|
|
|
|
def show_mp_hierarchy(cl):
|
|
"""
|
|
Return a picture of the Manna & Pnueli hierarchy as an SVG object
|
|
in the IPython/Jupyter.
|
|
"""
|
|
from spot.jupyter import SVG
|
|
return SVG(mp_hierarchy_svg(cl))
|
|
|
|
|
|
formula.show_mp_hierarchy = show_mp_hierarchy
|
|
|
|
|
|
@_extend(twa_word)
|
|
class twa_word:
|
|
def _repr_latex_(self):
|
|
bd = self.get_dict()
|
|
res = '$'
|
|
for idx, letter in enumerate(self.prefix):
|
|
if idx:
|
|
res += '; '
|
|
res += bdd_to_formula(letter, bd).to_str('j')
|
|
if len(res) > 1:
|
|
res += '; '
|
|
res += '\\mathsf{cycle}\\{'
|
|
for idx, letter in enumerate(self.cycle):
|
|
if idx:
|
|
res += '; '
|
|
res += bdd_to_formula(letter, bd).to_str('j')
|
|
return res + '\\}$'
|
|
|
|
def as_svg(self):
|
|
"""
|
|
Build an SVG picture representing the word as a collection of
|
|
signals for each atomic proposition.
|
|
"""
|
|
# Get the list of atomic proposition used
|
|
sup = buddy.bddtrue
|
|
for cond in list(self.prefix) + list(self.cycle):
|
|
sup = sup & buddy.bdd_support(cond)
|
|
ap = []
|
|
while sup != buddy.bddtrue:
|
|
a = buddy.bdd_var(sup)
|
|
ap.append(a)
|
|
sup = buddy.bdd_high(sup)
|
|
|
|
# Prepare canvas
|
|
psize = len(self.prefix)
|
|
csize = len(self.cycle)
|
|
d = {
|
|
'endprefix': 50 * psize,
|
|
'endcycle': 50 * (psize + csize),
|
|
'w': 50 * (psize + csize * 2),
|
|
'height': 50 * len(ap),
|
|
'height2': 50 * len(ap) + 10,
|
|
'h3': 50 * len(ap) + 12,
|
|
'bgcolor': '#f4f4f4',
|
|
'bgl': 'stroke="white" stroke-width="4"',
|
|
'bgt': 'stroke="white" stroke-width="1"',
|
|
'txt': 'text-anchor="start" font-size="20"',
|
|
'red': 'stroke="#ff0000" stroke-width="2"',
|
|
'sml': 'text-anchor="start" font-size="10"'
|
|
}
|
|
txt = '''
|
|
<svg height="{h3}" width="{w}" xmlns="http://www.w3.org/2000/svg" version="1.1">
|
|
<rect x="0" y="0" width="{w}" height="{height}" fill="{bgcolor}"/>
|
|
<line x1="{endprefix}" y1="0" x2="{endprefix}" y2="{height}"
|
|
stroke="white" stroke-width="4"/>
|
|
<line x1="{endcycle}" y1="0" x2="{endcycle}" y2="{height}"
|
|
stroke="white" stroke-width="4"/>
|
|
'''.format(**d)
|
|
|
|
# Iterate over all used atomic propositions, and fill each line
|
|
l = list(self.prefix) + list(self.cycle) + list(self.cycle)
|
|
bd = self.get_dict()
|
|
for ypos, a in enumerate(ap):
|
|
pa = buddy.bdd_ithvar(a)
|
|
na = buddy.bdd_nithvar(a)
|
|
name = bdd_format_formula(bd, pa)
|
|
# Whether the last state was down (-1), up (1), or unknown (0)
|
|
last = 0
|
|
txt += ('<line x1="0" y1="{y}" x2="{w}" y2="{y}" {bgl}/>'
|
|
.format(y=ypos*50, **d))
|
|
txt += ('<text x="{x}" y="{y}" {txt}>{name}</text>'
|
|
.format(x=3, y=ypos*50+30, name=name, **d))
|
|
for xpos, step in enumerate(l):
|
|
if buddy.bdd_implies(step, pa):
|
|
cur = 1
|
|
elif buddy.bdd_implies(step, na):
|
|
cur = -1
|
|
else:
|
|
cur = 0
|
|
txt += ('<line x1="{x}" y1="{y1}" x2="{x}" y2="{y2}" {bgt}/>'
|
|
.format(x=(xpos+1)*50, y1=ypos*50, y2=ypos*50+50, **d))
|
|
if cur != 0:
|
|
if last == -cur:
|
|
txt += \
|
|
('<line x1="{x}" y1="{y1}" x2="{x}" y2="{y2}" {red}/>'
|
|
.format(x=xpos*50, y1=ypos*50+5,
|
|
y2=ypos*50+45, **d))
|
|
txt += \
|
|
('<line x1="{x1}" y1="{y}" x2="{x2}" y2="{y}" {red}/>'
|
|
.format(x1=xpos*50, x2=(xpos+1)*50,
|
|
y=ypos*50+25-20*cur, **d))
|
|
last = cur
|
|
if psize > 0:
|
|
txt += '<text x="0" y="{height2}" {sml}>prefix</text>'.format(**d)
|
|
txt += '''<text x="{endprefix}" y="{height2}" {sml}>cycle</text>
|
|
<text x="{endcycle}" y="{height2}" {sml}>cycle</text>'''.format(**d)
|
|
return txt + '</svg>'
|
|
|
|
def show(self):
|
|
"""
|
|
Display the word as an SVG picture of signals.
|
|
"""
|
|
from spot.jupyter import SVG
|
|
return SVG(self.as_svg())
|
|
|
|
|
|
# Make scc_and_mark filter usable as context manager
|
|
@_extend(scc_and_mark_filter)
|
|
class scc_and_mark_filter:
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
self.restore_acceptance()
|
|
|
|
|
|
def to_parity(aut, options = None, **kwargs):
|
|
"""Convert aut into a parity acceptance.
|
|
|
|
This procedure combines multiple strategies to attempt to
|
|
produce a small parity automaton. The resulting parity
|
|
acceptance is either "max odd" or "max even".
|
|
|
|
The default optimization options maybe altered by passing either an
|
|
instance of to_parity_options object as options argument, or by
|
|
passing additional kwargs that will be used to update options.
|
|
|
|
Note that if you pass both your own options object and kwargs,
|
|
options will be updated in place.
|
|
"""
|
|
if options is None:
|
|
# Can't make this a default option, otherwise the default
|
|
# instance would be updated by the kwargs.
|
|
options = to_parity_options()
|
|
if kwargs:
|
|
for key,val in to_parity_options.__dict__.items():
|
|
if not key.startswith('_') and key != "thisown" and key in kwargs:
|
|
setattr(options, key, kwargs.get(key))
|
|
return impl.to_parity(aut, options)
|