master
/ .localenv / lib / python3.5 / site-packages / nbconvert / preprocessors / tests / test_execute.py

test_execute.py @master raw · history · blame

# coding=utf-8

"""
Module with tests for the execute preprocessor.
"""

# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.

from base64 import b64encode, b64decode
import copy
import glob
import io
import os
import re

import nbformat
import sys
import pytest

from .base import PreprocessorTestsBase
from ..execute import ExecutePreprocessor, CellExecutionError, executenb

from nbconvert.filters import strip_ansi
from testpath import modified_env
from ipython_genutils.py3compat import string_types

addr_pat = re.compile(r'0x[0-9a-f]{7,9}')
ipython_input_pat = re.compile(r'<ipython-input-\d+-[0-9a-f]+>')
current_dir = os.path.dirname(__file__)

def _normalize_base64(b64_text):
    # if it's base64, pass it through b64 decode/encode to avoid
    # equivalent values from being considered unequal
    try:
        return b64encode(b64decode(b64_text.encode('ascii'))).decode('ascii')
    except (ValueError, TypeError):
        return b64_text
    
class TestExecute(PreprocessorTestsBase):
    """Contains test functions for execute.py"""
    maxDiff = None

    @staticmethod
    def normalize_output(output):
        """
        Normalizes outputs for comparison.
        """
        output = dict(output)
        if 'metadata' in output:
            del output['metadata']
        if 'text' in output:
            output['text'] = re.sub(addr_pat, '<HEXADDR>', output['text'])
        if 'text/plain' in output.get('data', {}):
            output['data']['text/plain'] = \
                re.sub(addr_pat, '<HEXADDR>', output['data']['text/plain'])
        for key, value in output.get('data', {}).items():
            if isinstance(value, string_types):
                output['data'][key] = _normalize_base64(value)
        if 'traceback' in output:
            tb = [
                re.sub(ipython_input_pat, '<IPY-INPUT>', strip_ansi(line))
                for line in output['traceback']
            ]
            output['traceback'] = tb

        return output


    def assert_notebooks_equal(self, expected, actual):
        expected_cells = expected['cells']
        actual_cells = actual['cells']
        self.assertEqual(len(expected_cells), len(actual_cells))

        for expected_cell, actual_cell in zip(expected_cells, actual_cells):
            expected_outputs = expected_cell.get('outputs', [])
            actual_outputs = actual_cell.get('outputs', [])
            normalized_expected_outputs = list(map(self.normalize_output, expected_outputs))
            normalized_actual_outputs = list(map(self.normalize_output, actual_outputs))
            self.assertEqual(normalized_expected_outputs, normalized_actual_outputs)

            expected_execution_count = expected_cell.get('execution_count', None)
            actual_execution_count = actual_cell.get('execution_count', None)
            self.assertEqual(expected_execution_count, actual_execution_count)


    def build_preprocessor(self, opts):
        """Make an instance of a preprocessor"""
        preprocessor = ExecutePreprocessor()
        preprocessor.enabled = True
        for opt in opts:
            setattr(preprocessor, opt, opts[opt])
        return preprocessor


    def test_constructor(self):
        """Can a ExecutePreprocessor be constructed?"""
        self.build_preprocessor({})


    def run_notebook(self, filename, opts, resources):
        """Loads and runs a notebook, returning both the version prior to
        running it and the version after running it.

        """
        with io.open(filename) as f:
            input_nb = nbformat.read(f, 4)

        preprocessor = self.build_preprocessor(opts)
        cleaned_input_nb = copy.deepcopy(input_nb)
        for cell in cleaned_input_nb.cells:
            if 'execution_count' in cell:
                del cell['execution_count']
            cell['outputs'] = []

        # Override terminal size to standardise traceback format
        with modified_env({'COLUMNS': '80', 'LINES': '24'}):
            output_nb, _ = preprocessor(cleaned_input_nb, resources)

        return input_nb, output_nb

    def test_run_notebooks(self):
        """Runs a series of test notebooks and compares them to their actual output"""
        input_files = glob.glob(os.path.join(current_dir, 'files', '*.ipynb'))
        for filename in input_files:
            if os.path.basename(filename) == "Disable Stdin.ipynb":
                continue
            elif os.path.basename(filename) == "Interrupt.ipynb":
                opts = dict(timeout=1, interrupt_on_timeout=True, allow_errors=True)
            elif os.path.basename(filename) == "Skip Exceptions.ipynb":
                opts = dict(allow_errors=True)
            else:
                opts = dict()
            res = self.build_resources()
            res['metadata']['path'] = os.path.dirname(filename)
            input_nb, output_nb = self.run_notebook(filename, opts, res)
            self.assert_notebooks_equal(input_nb, output_nb)

    def test_empty_path(self):
        """Can the kernel be started when the path is empty?"""
        filename = os.path.join(current_dir, 'files', 'HelloWorld.ipynb')
        res = self.build_resources()
        res['metadata']['path'] = ''
        input_nb, output_nb = self.run_notebook(filename, {}, res)
        self.assert_notebooks_equal(input_nb, output_nb)

    def test_disable_stdin(self):
        """Test disabling standard input"""
        filename = os.path.join(current_dir, 'files', 'Disable Stdin.ipynb')
        res = self.build_resources()
        res['metadata']['path'] = os.path.dirname(filename)
        input_nb, output_nb = self.run_notebook(filename, dict(allow_errors=True), res)

        # We need to special-case this particular notebook, because the
        # traceback contains machine-specific stuff like where IPython
        # is installed. It is sufficient here to just check that an error
        # was thrown, and that it was a StdinNotImplementedError
        self.assertEqual(len(output_nb['cells']), 1)
        self.assertEqual(len(output_nb['cells'][0]['outputs']), 1)
        output = output_nb['cells'][0]['outputs'][0]
        self.assertEqual(output['output_type'], 'error')
        self.assertEqual(output['ename'], 'StdinNotImplementedError')
        self.assertEqual(output['evalue'], 'raw_input was called, but this frontend does not support input requests.')

    def test_timeout(self):
        """Check that an error is raised when a computation times out"""
        current_dir = os.path.dirname(__file__)
        filename = os.path.join(current_dir, 'files', 'Interrupt.ipynb')
        res = self.build_resources()
        res['metadata']['path'] = os.path.dirname(filename)
        try:
            exception = TimeoutError
        except NameError:
            exception = RuntimeError

        with pytest.raises(exception):
            self.run_notebook(filename, dict(timeout=1), res)

    def test_timeout_func(self):
        """Check that an error is raised when a computation times out"""
        current_dir = os.path.dirname(__file__)
        filename = os.path.join(current_dir, 'files', 'Interrupt.ipynb')
        res = self.build_resources()
        res['metadata']['path'] = os.path.dirname(filename)
        try:
            exception = TimeoutError
        except NameError:
            exception = RuntimeError

        def timeout_func(source):
            return 10

        with pytest.raises(exception):
            self.run_notebook(filename, dict(timeout_func=timeout_func), res)

    def test_allow_errors(self):
        """
        Check that conversion continues if ``allow_errors`` is False.
        """
        current_dir = os.path.dirname(__file__)
        filename = os.path.join(current_dir, 'files', 'Skip Exceptions.ipynb')
        res = self.build_resources()
        res['metadata']['path'] = os.path.dirname(filename)
        with pytest.raises(CellExecutionError) as exc:
            self.run_notebook(filename, dict(allow_errors=False), res)
        self.assertIsInstance(str(exc.value), str)
        if sys.version_info >= (3, 0):
            assert u"# üñîçø∂é" in str(exc.value)
        else:
            assert u"# üñîçø∂é".encode('utf8', 'replace') in str(exc.value)

    def test_custom_kernel_manager(self):
        from .fake_kernelmanager import FakeCustomKernelManager

        current_dir = os.path.dirname(__file__)

        filename = os.path.join(current_dir, 'files', 'HelloWorld.ipynb')

        with io.open(filename) as f:
            input_nb = nbformat.read(f, 4)

        preprocessor = self.build_preprocessor({
            'kernel_manager_class': FakeCustomKernelManager
        })

        cleaned_input_nb = copy.deepcopy(input_nb)
        for cell in cleaned_input_nb.cells:
            if 'execution_count' in cell:
                del cell['execution_count']
            cell['outputs'] = []

        # Override terminal size to standardise traceback format
        with modified_env({'COLUMNS': '80', 'LINES': '24'}):
            output_nb, _ = preprocessor(cleaned_input_nb,
                                        self.build_resources())

        expected = FakeCustomKernelManager.expected_methods.items()

        for method, call_count in expected:
            self.assertNotEqual(call_count, 0, '{} was called'.format(method))

    def test_execute_function(self):
        # Test the executenb() convenience API
        current_dir = os.path.dirname(__file__)
        filename = os.path.join(current_dir, 'files', 'HelloWorld.ipynb')

        with io.open(filename) as f:
            input_nb = nbformat.read(f, 4)

        original = copy.deepcopy(input_nb)
        executed = executenb(original, os.path.dirname(filename))
        self.assert_notebooks_equal(original, executed)