master
/ .localenv / lib / python3.5 / site-packages / ipykernel / tests / utils.py

utils.py @master raw · history · blame

"""utilities for testing IPython kernels"""

# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.

from __future__ import print_function

import atexit
import os
import sys

from contextlib import contextmanager
from subprocess import PIPE, STDOUT
try:
    from queue import Empty  # Py 3
except ImportError:
    from Queue import Empty  # Py 2

import nose

from jupyter_client import manager


STARTUP_TIMEOUT = 60
TIMEOUT = 15

KM = None
KC = None


def start_new_kernel(**kwargs):
    """start a new kernel, and return its Manager and Client

    Integrates with our output capturing for tests.
    """
    try:
        stdout = nose.iptest_stdstreams_fileno()
    except AttributeError:
        stdout = open(os.devnull)
    kwargs.update(dict(stdout=stdout, stderr=STDOUT))
    return manager.start_new_kernel(startup_timeout=STARTUP_TIMEOUT, **kwargs)


def flush_channels(kc=None):
    """flush any messages waiting on the queue"""
    from .test_message_spec import validate_message

    if kc is None:
        kc = KC
    for channel in (kc.shell_channel, kc.iopub_channel):
        while True:
            try:
                msg = channel.get_msg(block=True, timeout=0.1)
            except Empty:
                break
            else:
                validate_message(msg)


def execute(code='', kc=None, **kwargs):
    """wrapper for doing common steps for validating an execution request"""
    from .test_message_spec import validate_message
    if kc is None:
        kc = KC
    msg_id = kc.execute(code=code, **kwargs)
    reply = kc.get_shell_msg(timeout=TIMEOUT)
    validate_message(reply, 'execute_reply', msg_id)
    busy = kc.get_iopub_msg(timeout=TIMEOUT)
    validate_message(busy, 'status', msg_id)
    assert busy['content']['execution_state'] == 'busy'

    if not kwargs.get('silent'):
        execute_input = kc.get_iopub_msg(timeout=TIMEOUT)
        validate_message(execute_input, 'execute_input', msg_id)
        assert execute_input['content']['code'] == code

    # show tracebacks if present for debugging
    if reply['content'].get('traceback'):
        print('\n'.join(reply['content']['traceback']), file=sys.stderr)


    return msg_id, reply['content']

def start_global_kernel():
    """start the global kernel (if it isn't running) and return its client"""
    global KM, KC
    if KM is None:
        KM, KC = start_new_kernel()
        atexit.register(stop_global_kernel)
    else:
        flush_channels(KC)
    return KC

@contextmanager
def kernel():
    """Context manager for the global kernel instance

    Should be used for most kernel tests

    Returns
    -------
    kernel_client: connected KernelClient instance
    """
    yield start_global_kernel()

def uses_kernel(test_f):
    """Decorator for tests that use the global kernel"""
    def wrapped_test():
        with kernel() as kc:
            test_f(kc)
    wrapped_test.__doc__ = test_f.__doc__
    wrapped_test.__name__ = test_f.__name__
    return wrapped_test

def stop_global_kernel():
    """Stop the global shared kernel instance, if it exists"""
    global KM, KC
    KC.stop_channels()
    KC = None
    if KM is None:
        return
    KM.shutdown_kernel(now=True)
    KM = None

def new_kernel(argv=None):
    """Context manager for a new kernel in a subprocess

    Should only be used for tests where the kernel must not be re-used.

    Returns
    -------
    kernel_client: connected KernelClient instance
    """
    stdout = getattr(nose, 'iptest_stdstreams_fileno', open(os.devnull))
    kwargs = dict(stdout=stdout, stderr=STDOUT)
    if argv is not None:
        kwargs['extra_arguments'] = argv
    return manager.run_kernel(**kwargs)

def assemble_output(iopub):
    """assemble stdout/err from an execution"""
    stdout = ''
    stderr = ''
    while True:
        msg = iopub.get_msg(block=True, timeout=1)
        msg_type = msg['msg_type']
        content = msg['content']
        if msg_type == 'status' and content['execution_state'] == 'idle':
            # idle message signals end of output
            break
        elif msg['msg_type'] == 'stream':
            if content['name'] == 'stdout':
                stdout += content['text']
            elif content['name'] == 'stderr':
                stderr += content['text']
            else:
                raise KeyError("bad stream: %r" % content['name'])
        else:
            # other output, ignored
            pass
    return stdout, stderr

def wait_for_idle(kc):
    while True:
        msg = kc.iopub_channel.get_msg(block=True, timeout=1)
        msg_type = msg['msg_type']
        content = msg['content']
        if msg_type == 'status' and content['execution_state'] == 'idle':
            break