master
/ .localenv / lib / python3.5 / site-packages / ipykernel / tests / test_embed_kernel.py

test_embed_kernel.py @master

4d078e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
"""test IPython.embed_kernel()"""

# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.

import os
import sys
import time

from contextlib import contextmanager
from subprocess import Popen, PIPE

from jupyter_client import BlockingKernelClient
from jupyter_core import paths
from ipython_genutils import py3compat
from ipython_genutils.py3compat import unicode_type


SETUP_TIMEOUT = 60
TIMEOUT = 15


@contextmanager
def setup_kernel(cmd):
    """start an embedded kernel in a subprocess, and wait for it to be ready

    Returns
    -------
    kernel_manager: connected KernelManager instance
    """
    kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE)
    connection_file = os.path.join(
        paths.jupyter_runtime_dir(),
        'kernel-%i.json' % kernel.pid,
    )
    # wait for connection file to exist, timeout after 5s
    tic = time.time()
    while not os.path.exists(connection_file) \
        and kernel.poll() is None \
        and time.time() < tic + SETUP_TIMEOUT:
        time.sleep(0.1)

    if kernel.poll() is not None:
        o,e = kernel.communicate()
        e = py3compat.cast_unicode(e)
        raise IOError("Kernel failed to start:\n%s" % e)

    if not os.path.exists(connection_file):
        if kernel.poll() is None:
            kernel.terminate()
        raise IOError("Connection file %r never arrived" % connection_file)

    client = BlockingKernelClient(connection_file=connection_file)
    client.load_connection_file()
    client.start_channels()
    client.wait_for_ready()

    try:
        yield client
    finally:
        client.stop_channels()
        kernel.terminate()

def test_embed_kernel_basic():
    """IPython.embed_kernel() is basically functional"""
    cmd = '\n'.join([
        'from IPython import embed_kernel',
        'def go():',
        '    a=5',
        '    b="hi there"',
        '    embed_kernel()',
        'go()',
        '',
    ])

    with setup_kernel(cmd) as client:
        # oinfo a (int)
        msg_id = client.inspect('a')
        msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
        content = msg['content']
        assert content['found']

        msg_id = client.execute("c=a*2")
        msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
        content = msg['content']
        assert content['status'] == u'ok'

        # oinfo c (should be 10)
        msg_id = client.inspect('c')
        msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
        content = msg['content']
        assert content['found']
        text = content['data']['text/plain']
        assert '10' in text

def test_embed_kernel_namespace():
    """IPython.embed_kernel() inherits calling namespace"""
    cmd = '\n'.join([
        'from IPython import embed_kernel',
        'def go():',
        '    a=5',
        '    b="hi there"',
        '    embed_kernel()',
        'go()',
        '',
    ])

    with setup_kernel(cmd) as client:
        # oinfo a (int)
        msg_id = client.inspect('a')
        msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
        content = msg['content']
        assert content['found']
        text = content['data']['text/plain']
        assert u'5' in text

        # oinfo b (str)
        msg_id = client.inspect('b')
        msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
        content = msg['content']
        assert content['found']
        text = content['data']['text/plain']
        assert u'hi there' in text

        # oinfo c (undefined)
        msg_id = client.inspect('c')
        msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
        content = msg['content']
        assert not content['found']

def test_embed_kernel_reentrant():
    """IPython.embed_kernel() can be called multiple times"""
    cmd = '\n'.join([
        'from IPython import embed_kernel',
        'count = 0',
        'def go():',
        '    global count',
        '    embed_kernel()',
        '    count = count + 1',
        '',
        'while True:'
        '    go()',
        '',
    ])

    with setup_kernel(cmd) as client:
        for i in range(5):
            msg_id = client.inspect('count')
            msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
            content = msg['content']
            assert content['found']
            text = content['data']['text/plain']
            assert unicode_type(i) in text

            # exit from embed_kernel
            client.execute("get_ipython().exit_now = True")
            msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
            time.sleep(0.2)