python: get ipnbdoctest to work with ipython 3.0
With great help from the first two commits in https://github.com/paulgb/runipy/pull/49/commits * wrap/python/tests/ipnbdoctest.py: Update.
This commit is contained in:
parent
2362b9ab68
commit
2d13fd50ab
1 changed files with 45 additions and 34 deletions
|
|
@ -34,8 +34,8 @@ try:
|
|||
except ImportError:
|
||||
from IPython.zmq.blockingkernelmanager import BlockingKernelManager as KernelManager
|
||||
|
||||
from IPython.nbformat.current import reads, NotebookNode
|
||||
|
||||
# Until Debian ships IPython 3.0, we stick to the v3 format.
|
||||
from IPython.nbformat import v3 as nbformat
|
||||
|
||||
def compare_png(a64, b64):
|
||||
"""compare two b64 PNGs (incomplete)"""
|
||||
|
|
@ -61,8 +61,8 @@ def sanitize(s):
|
|||
# ignore trailing newlines (but not space)
|
||||
s = s.rstrip('\n')
|
||||
|
||||
# normalize hex addresses:
|
||||
s = re.sub(r'0x[a-f0-9]+', '0xFFFFFFFF', s)
|
||||
# remove hex addresses:
|
||||
s = re.sub(r'at 0x[a-f0-9]+', 'object', s)
|
||||
|
||||
# normalize UUIDs:
|
||||
s = re.sub(r'[a-f0-9]{8}(\-[a-f0-9]{4}){3}\-[a-f0-9]{12}', 'U-U-I-D', s)
|
||||
|
|
@ -108,33 +108,54 @@ def compare_outputs(test, ref, skip_cmp=('png', 'traceback',
|
|||
return False
|
||||
return True
|
||||
|
||||
def _wait_for_ready_backport(kf):
|
||||
"""Backport BlockingKernelClient.wait_for_ready from IPython 3"""
|
||||
# Wait for kernel info reply on shell channel
|
||||
kc.kernel_info()
|
||||
while True:
|
||||
msg = kc.get_shell_msg(block=True, timeout=30)
|
||||
if msg['msg_type'] == 'kernel_info_reply':
|
||||
break
|
||||
# Flush IOPub channel
|
||||
while True:
|
||||
try:
|
||||
msg = kc.get_iopub_msg(block=True, timeout=0.2)
|
||||
except Empty:
|
||||
break
|
||||
|
||||
def run_cell(shell, iopub, cell):
|
||||
def run_cell(kc, cell):
|
||||
# print cell.input
|
||||
shell.execute(cell.input)
|
||||
kc.execute(cell.input)
|
||||
# wait for finish, maximum 20s
|
||||
shell.get_msg(timeout=20)
|
||||
kc.get_shell_msg(timeout=20)
|
||||
outs = []
|
||||
|
||||
while True:
|
||||
try:
|
||||
msg = iopub.get_msg(timeout=0.2)
|
||||
msg = kc.get_iopub_msg(timeout=0.2)
|
||||
except Empty:
|
||||
break
|
||||
msg_type = msg['msg_type']
|
||||
if msg_type in ('status', 'pyin'):
|
||||
if msg_type in ('status', 'pyin', 'execute_input'):
|
||||
continue
|
||||
elif msg_type == 'clear_output':
|
||||
outs = []
|
||||
continue
|
||||
|
||||
content = msg['content']
|
||||
# print msg_type, content
|
||||
out = NotebookNode(output_type=msg_type)
|
||||
# print (msg_type, content)
|
||||
if msg_type == 'execute_result':
|
||||
msg_type = 'pyout'
|
||||
elif msg_type == 'error':
|
||||
msg_type = 'pyerr'
|
||||
out = nbformat.NotebookNode(output_type=msg_type)
|
||||
|
||||
if msg_type == 'stream':
|
||||
out.stream = content['name']
|
||||
out.text = content['data']
|
||||
if 'text' in content:
|
||||
out.text = content['text']
|
||||
else:
|
||||
out.text = content['data']
|
||||
elif msg_type in ('display_data', 'pyout'):
|
||||
out['metadata'] = content['metadata']
|
||||
for mime, data in content['data'].items():
|
||||
|
|
@ -142,7 +163,7 @@ def run_cell(shell, iopub, cell):
|
|||
# this gets most right, but fix svg+html, plain
|
||||
attr = attr.replace('+xml', '').replace('plain', 'text')
|
||||
setattr(out, attr, data)
|
||||
if msg_type == 'pyout':
|
||||
if 'execution_count' in content:
|
||||
out.prompt_number = content['execution_count']
|
||||
elif msg_type == 'pyerr':
|
||||
out.ename = content['ename']
|
||||
|
|
@ -156,29 +177,19 @@ def run_cell(shell, iopub, cell):
|
|||
|
||||
|
||||
def test_notebook(nb):
|
||||
# run %pylab inline, because some notebooks assume this
|
||||
# even though they shouldn't
|
||||
km = KernelManager()
|
||||
km.start_kernel(extra_arguments=['--pylab=inline'],
|
||||
stderr=open(os.devnull, 'w'))
|
||||
try:
|
||||
kc = km.client()
|
||||
kc.start_channels()
|
||||
iopub = kc.iopub_channel
|
||||
except AttributeError:
|
||||
# IPython 0.13
|
||||
kc = km
|
||||
kc.start_channels()
|
||||
iopub = kc.sub_channel
|
||||
shell = kc.shell_channel
|
||||
|
||||
# run %pylab inline, because some notebooks assume this
|
||||
# even though they shouldn't
|
||||
shell.execute("pass")
|
||||
shell.get_msg()
|
||||
while True:
|
||||
try:
|
||||
iopub.get_msg(timeout=1)
|
||||
except Empty:
|
||||
break
|
||||
kc = km.client()
|
||||
kc.start_channels()
|
||||
|
||||
try:
|
||||
kc.wait_for_ready()
|
||||
except AttributeError:
|
||||
_wait_for_ready_backport(kc)
|
||||
|
||||
successes = 0
|
||||
failures = 0
|
||||
|
|
@ -188,7 +199,7 @@ def test_notebook(nb):
|
|||
if cell.cell_type != 'code':
|
||||
continue
|
||||
try:
|
||||
outs = run_cell(shell, iopub, cell)
|
||||
outs = run_cell(kc, cell)
|
||||
except Exception as e:
|
||||
print("failed to run cell:", repr(e))
|
||||
print(cell.input)
|
||||
|
|
@ -228,5 +239,5 @@ if __name__ == '__main__':
|
|||
for ipynb in sys.argv[1:]:
|
||||
print("testing %s" % ipynb)
|
||||
with open(ipynb) as f:
|
||||
nb = reads(f.read(), 'json')
|
||||
nb = nbformat.reads_json(f.read())
|
||||
test_notebook(nb)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue