diff --git a/wrap/python/tests/ipnbdoctest.py b/wrap/python/tests/ipnbdoctest.py index 9a7d20cff..4b14f4400 100755 --- a/wrap/python/tests/ipnbdoctest.py +++ b/wrap/python/tests/ipnbdoctest.py @@ -34,8 +34,8 @@ try: except ImportError: from IPython.zmq.blockingkernelmanager import BlockingKernelManager as KernelManager -from IPython.nbformat.current import reads, NotebookNode - +# Until Debian ships IPython 3.0, we stick to the v3 format. +from IPython.nbformat import v3 as nbformat def compare_png(a64, b64): """compare two b64 PNGs (incomplete)""" @@ -61,8 +61,8 @@ def sanitize(s): # ignore trailing newlines (but not space) s = s.rstrip('\n') - # normalize hex addresses: - s = re.sub(r'0x[a-f0-9]+', '0xFFFFFFFF', s) + # remove hex addresses: + s = re.sub(r'at 0x[a-f0-9]+', 'object', s) # normalize UUIDs: s = re.sub(r'[a-f0-9]{8}(\-[a-f0-9]{4}){3}\-[a-f0-9]{12}', 'U-U-I-D', s) @@ -108,33 +108,54 @@ def compare_outputs(test, ref, skip_cmp=('png', 'traceback', return False return True +def _wait_for_ready_backport(kf): + """Backport BlockingKernelClient.wait_for_ready from IPython 3""" + # Wait for kernel info reply on shell channel + kc.kernel_info() + while True: + msg = kc.get_shell_msg(block=True, timeout=30) + if msg['msg_type'] == 'kernel_info_reply': + break + # Flush IOPub channel + while True: + try: + msg = kc.get_iopub_msg(block=True, timeout=0.2) + except Empty: + break -def run_cell(shell, iopub, cell): +def run_cell(kc, cell): # print cell.input - shell.execute(cell.input) + kc.execute(cell.input) # wait for finish, maximum 20s - shell.get_msg(timeout=20) + kc.get_shell_msg(timeout=20) outs = [] while True: try: - msg = iopub.get_msg(timeout=0.2) + msg = kc.get_iopub_msg(timeout=0.2) except Empty: break msg_type = msg['msg_type'] - if msg_type in ('status', 'pyin'): + if msg_type in ('status', 'pyin', 'execute_input'): continue elif msg_type == 'clear_output': outs = [] continue content = msg['content'] - # print msg_type, content - out = NotebookNode(output_type=msg_type) + # print (msg_type, content) + if msg_type == 'execute_result': + msg_type = 'pyout' + elif msg_type == 'error': + msg_type = 'pyerr' + out = nbformat.NotebookNode(output_type=msg_type) if msg_type == 'stream': out.stream = content['name'] - out.text = content['data'] + if 'text' in content: + out.text = content['text'] + else: + out.text = content['data'] elif msg_type in ('display_data', 'pyout'): out['metadata'] = content['metadata'] for mime, data in content['data'].items(): @@ -142,7 +163,7 @@ def run_cell(shell, iopub, cell): # this gets most right, but fix svg+html, plain attr = attr.replace('+xml', '').replace('plain', 'text') setattr(out, attr, data) - if msg_type == 'pyout': + if 'execution_count' in content: out.prompt_number = content['execution_count'] elif msg_type == 'pyerr': out.ename = content['ename'] @@ -156,29 +177,19 @@ def run_cell(shell, iopub, cell): def test_notebook(nb): + # run %pylab inline, because some notebooks assume this + # even though they shouldn't km = KernelManager() km.start_kernel(extra_arguments=['--pylab=inline'], stderr=open(os.devnull, 'w')) - try: - kc = km.client() - kc.start_channels() - iopub = kc.iopub_channel - except AttributeError: - # IPython 0.13 - kc = km - kc.start_channels() - iopub = kc.sub_channel - shell = kc.shell_channel - # run %pylab inline, because some notebooks assume this - # even though they shouldn't - shell.execute("pass") - shell.get_msg() - while True: - try: - iopub.get_msg(timeout=1) - except Empty: - break + kc = km.client() + kc.start_channels() + + try: + kc.wait_for_ready() + except AttributeError: + _wait_for_ready_backport(kc) successes = 0 failures = 0 @@ -188,7 +199,7 @@ def test_notebook(nb): if cell.cell_type != 'code': continue try: - outs = run_cell(shell, iopub, cell) + outs = run_cell(kc, cell) except Exception as e: print("failed to run cell:", repr(e)) print(cell.input) @@ -228,5 +239,5 @@ if __name__ == '__main__': for ipynb in sys.argv[1:]: print("testing %s" % ipynb) with open(ipynb) as f: - nb = reads(f.read(), 'json') + nb = nbformat.reads_json(f.read()) test_notebook(nb)