#!/usr/bin/env python """ simple example script for running and testing notebooks. Usage: `ipnbdoctest.py foo.ipynb [bar.ipynb [...]]` Each cell is submitted to the kernel, and the outputs are compared with those stored in the notebook. """ from __future__ import print_function import os,sys,time import base64 import re from difflib import unified_diff as diff from collections import defaultdict try: from queue import Empty except ImportError: print('Python 3.x is needed to run this script.') sys.exit(77) import imp try: imp.find_module('IPython') except: print('IPython is needed to run this script.') sys.exit(77) try: from IPython.kernel import KernelManager except ImportError: from IPython.zmq.blockingkernelmanager import BlockingKernelManager as KernelManager from IPython.nbformat.current import reads, NotebookNode def compare_png(a64, b64): """compare two b64 PNGs (incomplete)""" try: import Image except ImportError: pass adata = base64.decodestring(a64) bdata = base64.decodestring(b64) return True def sanitize(s): """sanitize a string for comparison. fix universal newlines, strip trailing newlines, and normalize likely random values (memory addresses and UUIDs) """ if not isinstance(s, str): return s # normalize newline: s = s.replace('\r\n', '\n') # ignore trailing newlines (but not space) s = s.rstrip('\n') # normalize hex addresses: s = re.sub(r'0x[a-f0-9]+', '0xFFFFFFFF', s) # normalize UUIDs: s = re.sub(r'[a-f0-9]{8}(\-[a-f0-9]{4}){3}\-[a-f0-9]{12}', 'U-U-I-D', s) return s def consolidate_outputs(outputs): """consolidate outputs into a summary dict (incomplete)""" data = defaultdict(list) data['stdout'] = '' data['stderr'] = '' for out in outputs: if out.type == 'stream': data[out.stream] += out.text elif out.type == 'pyerr': data['pyerr'] = dict(ename=out.ename, evalue=out.evalue) else: for key in ('png', 'svg', 'latex', 'html', 'javascript', 'text', 'jpeg',): if key in out: data[key].append(out[key]) return data def compare_outputs(test, ref, skip_cmp=('png', 'traceback', 'latex', 'prompt_number')): for key in ref: if key not in test: print("missing key: %s != %s" % (test.keys(), ref.keys())) return False elif key not in skip_cmp and sanitize(test[key]) != sanitize(ref[key]): print("mismatch %s:" % key) exp = ref[key] eff = test[key] if exp[:-1] != '\n': exp += '\n' if eff[:-1] != '\n': eff += '\n' print(''.join(diff(exp.splitlines(1), eff.splitlines(1), fromfile='expected', tofile='effective'))) return False return True def run_cell(shell, iopub, cell): # print cell.input shell.execute(cell.input) # wait for finish, maximum 20s shell.get_msg(timeout=20) outs = [] while True: try: msg = iopub.get_msg(timeout=0.2) except Empty: break msg_type = msg['msg_type'] if msg_type in ('status', 'pyin'): continue elif msg_type == 'clear_output': outs = [] continue content = msg['content'] # print msg_type, content out = NotebookNode(output_type=msg_type) if msg_type == 'stream': out.stream = content['name'] out.text = content['data'] elif msg_type in ('display_data', 'pyout'): out['metadata'] = content['metadata'] for mime, data in content['data'].items(): attr = mime.split('/')[-1].lower() # this gets most right, but fix svg+html, plain attr = attr.replace('+xml', '').replace('plain', 'text') setattr(out, attr, data) if msg_type == 'pyout': out.prompt_number = content['execution_count'] elif msg_type == 'pyerr': out.ename = content['ename'] out.evalue = content['evalue'] out.traceback = content['traceback'] else: print("unhandled iopub msg:", msg_type) outs.append(out) return outs def test_notebook(nb): km = KernelManager() km.start_kernel(extra_arguments=['--pylab=inline'], stderr=open(os.devnull, 'w')) try: kc = km.client() kc.start_channels() iopub = kc.iopub_channel except AttributeError: # IPython 0.13 kc = km kc.start_channels() iopub = kc.sub_channel shell = kc.shell_channel # run %pylab inline, because some notebooks assume this # even though they shouldn't shell.execute("pass") shell.get_msg() while True: try: iopub.get_msg(timeout=1) except Empty: break successes = 0 failures = 0 errors = 0 for ws in nb.worksheets: for i, cell in enumerate(ws.cells): if cell.cell_type != 'code': continue try: outs = run_cell(shell, iopub, cell) except Exception as e: print("failed to run cell:", repr(e)) print(cell.input) errors += 1 continue failed = False if len(outs) != len(cell.outputs): print("output length mismatch (expected {}, got {})".format( len(cell.outputs), len(outs))) failed = True for out, ref in zip(outs, cell.outputs): if not compare_outputs(out, ref): failed = True print("cell %d: " % i, end="") if failed: print("FAIL") failures += 1 else: print("OK") successes += 1 print() print("tested notebook %s" % nb.metadata.name) print(" %3i cells successfully replicated" % successes) if failures: print(" %3i cells mismatched output" % failures) if errors: print(" %3i cells failed to complete" % errors) kc.stop_channels() km.shutdown_kernel() del km if failures | errors: sys.exit(1) if __name__ == '__main__': for ipynb in sys.argv[1:]: print("testing %s" % ipynb) with open(ipynb) as f: nb = reads(f.read(), 'json') test_notebook(nb)