from __future__ import absolute_import, division, print_function
from tornado import gen, ioloop
from tornado.log import app_log
from tornado.simple_httpclient import SimpleAsyncHTTPClient, HTTPTimeoutError
from tornado.test.util import unittest, skipBefore35, exec_test, ignore_deprecation
from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, bind_unused_port, gen_test, ExpectLog
from tornado.web import Application
import contextlib
import os
import platform
import traceback
import warnings
try:
import asyncio
except ImportError:
asyncio = None
@contextlib.contextmanager
def set_environ(name, value):
old_value = os.environ.get(name)
os.environ[name] = value
try:
yield
finally:
if old_value is None:
del os.environ[name]
else:
os.environ[name] = old_value
class AsyncTestCaseTest(AsyncTestCase):
def test_exception_in_callback(self):
with ignore_deprecation():
self.io_loop.add_callback(lambda: 1 / 0)
try:
self.wait()
self.fail("did not get expected exception")
except ZeroDivisionError:
pass
def test_wait_timeout(self):
time = self.io_loop.time
# Accept default 5-second timeout, no error
self.io_loop.add_timeout(time() + 0.01, self.stop)
self.wait()
# Timeout passed to wait()
self.io_loop.add_timeout(time() + 1, self.stop)
with self.assertRaises(self.failureException):
self.wait(timeout=0.01)
# Timeout set with environment variable
self.io_loop.add_timeout(time() + 1, self.stop)
with set_environ('ASYNC_TEST_TIMEOUT', '0.01'):
with self.assertRaises(self.failureException):
self.wait()
def test_subsequent_wait_calls(self):
"""
This test makes sure that a second call to wait()
clears the first timeout.
"""
self.io_loop.add_timeout(self.io_loop.time() + 0.00, self.stop)
self.wait(timeout=0.02)
self.io_loop.add_timeout(self.io_loop.time() + 0.03, self.stop)
self.wait(timeout=0.15)
def test_multiple_errors(self):
with ignore_deprecation():
def fail(message):
raise Exception(message)
self.io_loop.add_callback(lambda: fail("error one"))
self.io_loop.add_callback(lambda: fail("error two"))
# The first error gets raised; the second gets logged.
with ExpectLog(app_log, "multiple unhandled exceptions"):
with self.assertRaises(Exception) as cm:
self.wait()
self.assertEqual(str(cm.exception), "error one")
class AsyncHTTPTestCaseTest(AsyncHTTPTestCase):
@classmethod
def setUpClass(cls):
super(AsyncHTTPTestCaseTest, cls).setUpClass()
# An unused port is bound so we can make requests upon it without
# impacting a real local web server.
cls.external_sock, cls.external_port = bind_unused_port()
def get_app(self):
return Application()
def test_fetch_segment(self):
path = '/path'
response = self.fetch(path)
self.assertEqual(response.request.url, self.get_url(path))
@gen_test
def test_fetch_full_http_url(self):
path = 'http://localhost:%d/path' % self.external_port
with contextlib.closing(SimpleAsyncHTTPClient(force_instance=True)) as client:
with self.assertRaises(HTTPTimeoutError) as cm:
yield client.fetch(path, request_timeout=0.1, raise_error=True)
self.assertEqual(cm.exception.response.request.url, path)
@gen_test
def test_fetch_full_https_url(self):
path = 'https://localhost:%d/path' % self.external_port
with contextlib.closing(SimpleAsyncHTTPClient(force_instance=True)) as client:
with self.assertRaises(HTTPTimeoutError) as cm:
yield client.fetch(path, request_timeout=0.1, raise_error=True)
self.assertEqual(cm.exception.response.request.url, path)
@classmethod
def tearDownClass(cls):
cls.external_sock.close()
super(AsyncHTTPTestCaseTest, cls).tearDownClass()
class AsyncTestCaseWrapperTest(unittest.TestCase):
def test_undecorated_generator(self):
class Test(AsyncTestCase):
def test_gen(self):
yield
test = Test('test_gen')
result = unittest.TestResult()
test.run(result)
self.assertEqual(len(result.errors), 1)
self.assertIn("should be decorated", result.errors[0][1])
@skipBefore35
@unittest.skipIf(platform.python_implementation() == 'PyPy',
'pypy destructor warnings cannot be silenced')
def test_undecorated_coroutine(self):
namespace = exec_test(globals(), locals(), """
class Test(AsyncTestCase):
async def test_coro(self):
pass
""")
test_class = namespace['Test']
test = test_class('test_coro')
result = unittest.TestResult()
# Silence "RuntimeWarning: coroutine 'test_coro' was never awaited".
with warnings.catch_warnings():
warnings.simplefilter('ignore')
test.run(result)
self.assertEqual(len(result.errors), 1)
self.assertIn("should be decorated", result.errors[0][1])
def test_undecorated_generator_with_skip(self):
class Test(AsyncTestCase):
@unittest.skip("don't run this")
def test_gen(self):
yield
test = Test('test_gen')
result = unittest.TestResult()
test.run(result)
self.assertEqual(len(result.errors), 0)
self.assertEqual(len(result.skipped), 1)
def test_other_return(self):
class Test(AsyncTestCase):
def test_other_return(self):
return 42
test = Test('test_other_return')
result = unittest.TestResult()
test.run(result)
self.assertEqual(len(result.errors), 1)
self.assertIn("Return value from test method ignored", result.errors[0][1])
class SetUpTearDownTest(unittest.TestCase):
def test_set_up_tear_down(self):
"""
This test makes sure that AsyncTestCase calls super methods for
setUp and tearDown.
InheritBoth is a subclass of both AsyncTestCase and
SetUpTearDown, with the ordering so that the super of
AsyncTestCase will be SetUpTearDown.
"""
events = []
result = unittest.TestResult()
class SetUpTearDown(unittest.TestCase):
def setUp(self):
events.append('setUp')
def tearDown(self):
events.append('tearDown')
class InheritBoth(AsyncTestCase, SetUpTearDown):
def test(self):
events.append('test')
InheritBoth('test').run(result)
expected = ['setUp', 'test', 'tearDown']
self.assertEqual(expected, events)
class GenTest(AsyncTestCase):
def setUp(self):
super(GenTest, self).setUp()
self.finished = False
def tearDown(self):
self.assertTrue(self.finished)
super(GenTest, self).tearDown()
@gen_test
def test_sync(self):
self.finished = True
@gen_test
def test_async(self):
yield gen.moment
self.finished = True
def test_timeout(self):
# Set a short timeout and exceed it.
@gen_test(timeout=0.1)
def test(self):
yield gen.sleep(1)
# This can't use assertRaises because we need to inspect the
# exc_info triple (and not just the exception object)
try:
test(self)
self.fail("did not get expected exception")
except ioloop.TimeoutError:
# The stack trace should blame the add_timeout line, not just
# unrelated IOLoop/testing internals.
self.assertIn(
"gen.sleep(1)",
traceback.format_exc())
self.finished = True
def test_no_timeout(self):
# A test that does not exceed its timeout should succeed.
@gen_test(timeout=1)
def test(self):
yield gen.sleep(0.1)
test(self)
self.finished = True
def test_timeout_environment_variable(self):
@gen_test(timeout=0.5)
def test_long_timeout(self):
yield gen.sleep(0.25)
# Uses provided timeout of 0.5 seconds, doesn't time out.
with set_environ('ASYNC_TEST_TIMEOUT', '0.1'):
test_long_timeout(self)
self.finished = True
def test_no_timeout_environment_variable(self):
@gen_test(timeout=0.01)
def test_short_timeout(self):
yield gen.sleep(1)
# Uses environment-variable timeout of 0.1, times out.
with set_environ('ASYNC_TEST_TIMEOUT', '0.1'):
with self.assertRaises(ioloop.TimeoutError):
test_short_timeout(self)
self.finished = True
def test_with_method_args(self):
@gen_test
def test_with_args(self, *args):
self.assertEqual(args, ('test',))
yield gen.moment
test_with_args(self, 'test')
self.finished = True
def test_with_method_kwargs(self):
@gen_test
def test_with_kwargs(self, **kwargs):
self.assertDictEqual(kwargs, {'test': 'test'})
yield gen.moment
test_with_kwargs(self, test='test')
self.finished = True
@skipBefore35
def test_native_coroutine(self):
namespace = exec_test(globals(), locals(), """
@gen_test
async def test(self):
self.finished = True
""")
namespace['test'](self)
@skipBefore35
def test_native_coroutine_timeout(self):
# Set a short timeout and exceed it.
namespace = exec_test(globals(), locals(), """
@gen_test(timeout=0.1)
async def test(self):
await gen.sleep(1)
""")
try:
namespace['test'](self)
self.fail("did not get expected exception")
except ioloop.TimeoutError:
self.finished = True
@unittest.skipIf(asyncio is None, "asyncio module not present")
class GetNewIOLoopTest(AsyncTestCase):
def get_new_ioloop(self):
# Use the current loop instead of creating a new one here.
return ioloop.IOLoop.current()
def setUp(self):
# This simulates the effect of an asyncio test harness like
# pytest-asyncio.
self.orig_loop = asyncio.get_event_loop()
self.new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.new_loop)
super(GetNewIOLoopTest, self).setUp()
def tearDown(self):
super(GetNewIOLoopTest, self).tearDown()
# AsyncTestCase must not affect the existing asyncio loop.
self.assertFalse(asyncio.get_event_loop().is_closed())
asyncio.set_event_loop(self.orig_loop)
self.new_loop.close()
def test_loop(self):
self.assertIs(self.io_loop.asyncio_loop, self.new_loop)
if __name__ == '__main__':
unittest.main()