"""test IPython.embed_kernel()"""
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
import os
import sys
import time
from contextlib import contextmanager
from subprocess import Popen, PIPE
from jupyter_client import BlockingKernelClient
from jupyter_core import paths
from ipython_genutils import py3compat
from ipython_genutils.py3compat import unicode_type
SETUP_TIMEOUT = 60
TIMEOUT = 15
@contextmanager
def setup_kernel(cmd):
"""start an embedded kernel in a subprocess, and wait for it to be ready
Returns
-------
kernel_manager: connected KernelManager instance
"""
kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE)
connection_file = os.path.join(
paths.jupyter_runtime_dir(),
'kernel-%i.json' % kernel.pid,
)
# wait for connection file to exist, timeout after 5s
tic = time.time()
while not os.path.exists(connection_file) \
and kernel.poll() is None \
and time.time() < tic + SETUP_TIMEOUT:
time.sleep(0.1)
if kernel.poll() is not None:
o,e = kernel.communicate()
e = py3compat.cast_unicode(e)
raise IOError("Kernel failed to start:\n%s" % e)
if not os.path.exists(connection_file):
if kernel.poll() is None:
kernel.terminate()
raise IOError("Connection file %r never arrived" % connection_file)
client = BlockingKernelClient(connection_file=connection_file)
client.load_connection_file()
client.start_channels()
client.wait_for_ready()
try:
yield client
finally:
client.stop_channels()
kernel.terminate()
def test_embed_kernel_basic():
"""IPython.embed_kernel() is basically functional"""
cmd = '\n'.join([
'from IPython import embed_kernel',
'def go():',
' a=5',
' b="hi there"',
' embed_kernel()',
'go()',
'',
])
with setup_kernel(cmd) as client:
# oinfo a (int)
msg_id = client.inspect('a')
msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
content = msg['content']
assert content['found']
msg_id = client.execute("c=a*2")
msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
content = msg['content']
assert content['status'] == u'ok'
# oinfo c (should be 10)
msg_id = client.inspect('c')
msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
content = msg['content']
assert content['found']
text = content['data']['text/plain']
assert '10' in text
def test_embed_kernel_namespace():
"""IPython.embed_kernel() inherits calling namespace"""
cmd = '\n'.join([
'from IPython import embed_kernel',
'def go():',
' a=5',
' b="hi there"',
' embed_kernel()',
'go()',
'',
])
with setup_kernel(cmd) as client:
# oinfo a (int)
msg_id = client.inspect('a')
msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
content = msg['content']
assert content['found']
text = content['data']['text/plain']
assert u'5' in text
# oinfo b (str)
msg_id = client.inspect('b')
msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
content = msg['content']
assert content['found']
text = content['data']['text/plain']
assert u'hi there' in text
# oinfo c (undefined)
msg_id = client.inspect('c')
msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
content = msg['content']
assert not content['found']
def test_embed_kernel_reentrant():
"""IPython.embed_kernel() can be called multiple times"""
cmd = '\n'.join([
'from IPython import embed_kernel',
'count = 0',
'def go():',
' global count',
' embed_kernel()',
' count = count + 1',
'',
'while True:'
' go()',
'',
])
with setup_kernel(cmd) as client:
for i in range(5):
msg_id = client.inspect('count')
msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
content = msg['content']
assert content['found']
text = content['data']['text/plain']
assert unicode_type(i) in text
# exit from embed_kernel
client.execute("get_ipython().exit_now = True")
msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
time.sleep(0.2)