blob: 94a1cd2e97c5f2d571869beba48b8028a3b06e87 [file] [log] [blame] [edit]
# async test cases (compile and run)
[case testRunAsyncBasics]
import asyncio
from typing import Callable, Awaitable
from testutil import assertRaises
async def h() -> int:
return 1
async def g() -> int:
await asyncio.sleep(0)
return await h()
async def f() -> int:
return await g() + 2
async def f2() -> int:
x = 0
for i in range(2):
x += i + await f() + await g()
return x
async def test_simple_call() -> None:
result = await f()
assert result == 3
async def test_multiple_awaits_in_expression() -> None:
result = await f2()
assert result == 9
class MyError(Exception):
pass
async def exc1() -> None:
await asyncio.sleep(0)
raise MyError()
async def exc2() -> None:
await asyncio.sleep(0)
raise MyError()
async def exc3() -> None:
await exc1()
async def exc4() -> None:
await exc2()
async def exc5() -> int:
try:
await exc1()
except MyError:
return 3
return 4
async def exc6() -> int:
try:
await exc4()
except MyError:
return 3
return 4
async def test_exception() -> None:
with assertRaises(MyError):
await exc1()
with assertRaises(MyError):
await exc2()
with assertRaises(MyError):
await exc3()
with assertRaises(MyError):
await exc4()
assert await exc5() == 3
assert await exc6() == 3
async def indirect_call(x: int, c: Callable[[int], Awaitable[int]]) -> int:
return await c(x)
async def indirect_call_2(a: Awaitable[None]) -> None:
await a
async def indirect_call_3(a: Awaitable[float]) -> float:
return (await a) + 1.0
async def inc(x: int) -> int:
await asyncio.sleep(0)
return x + 1
async def ident(x: float, err: bool = False) -> float:
await asyncio.sleep(0.0)
if err:
raise MyError()
return x + float("0.0")
async def test_indirect_call() -> None:
assert await indirect_call(3, inc) == 4
with assertRaises(MyError):
await indirect_call_2(exc1())
assert await indirect_call_3(ident(2.0)) == 3.0
assert await indirect_call_3(ident(-113.0)) == -112.0
assert await indirect_call_3(ident(-114.0)) == -113.0
with assertRaises(MyError):
await indirect_call_3(ident(1.0, True))
with assertRaises(MyError):
await indirect_call_3(ident(-113.0, True))
class C:
def __init__(self, n: int) -> None:
self.n = n
async def add(self, x: int, err: bool = False) -> int:
await asyncio.sleep(0)
if err:
raise MyError()
return x + self.n
async def method_call(x: int) -> int:
c = C(5)
return await c.add(x)
async def method_call_exception() -> int:
c = C(5)
return await c.add(3, err=True)
async def test_async_method_call() -> None:
assert await method_call(3) == 8
with assertRaises(MyError):
await method_call_exception()
[file asyncio/__init__.pyi]
async def sleep(t: float) -> None: ...
[typing fixtures/typing-full.pyi]
[case testRunAsyncAwaitInVariousPositions]
from typing import cast, Any
import asyncio
async def one() -> int:
await asyncio.sleep(0.0)
return int() + 1
async def true() -> bool:
return bool(int() + await one())
async def branch_await() -> int:
if bool(int() + 1) == await true():
return 3
return 2
async def branch_await_not() -> int:
if bool(int() + 1) == (not await true()):
return 3
return 2
async def test_branch() -> None:
assert await branch_await() == 3
assert await branch_await_not() == 2
async def assign_multi() -> int:
_, x = int(), await one()
return x + 1
async def test_assign_multi() -> None:
assert await assign_multi() == 2
class C:
def __init__(self, s: str) -> None:
self.s = s
def concat(self, s: str) -> str:
return self.s + s
async def make_c(s: str) -> C:
await one()
return C(s)
async def concat(s: str, t: str) -> str:
await one()
return s + t
async def set_attr(s: str) -> None:
(await make_c("xyz")).s = await concat(s, "!")
async def test_set_attr() -> None:
await set_attr("foo") # Just check that it compiles and runs
def concat2(x: str, y: str) -> str:
return x + y
async def call1(s: str) -> str:
return concat2(str(int()), await concat(s, "a"))
async def call2(s: str) -> str:
return await concat(str(int()), await concat(s, "b"))
async def test_call() -> None:
assert await call1("foo") == "0fooa"
assert await call2("foo") == "0foob"
async def method_call(s: str) -> str:
return C("<").concat(await concat(s, ">"))
async def test_method_call() -> None:
assert await method_call("foo") == "<foo>"
class D:
def __init__(self, a: str, b: str) -> None:
self.a = a
self.b = b
async def construct(s: str) -> str:
c = D(await concat(s, "!"), await concat(s, "?"))
return c.a + c.b
async def test_construct() -> None:
assert await construct("foo") == "foo!foo?"
[file asyncio/__init__.pyi]
async def sleep(t: float) -> None: ...
[typing fixtures/typing-full.pyi]
[case testAsyncWith]
from testutil import async_val
class async_ctx:
async def __aenter__(self) -> str:
await async_val("enter")
return "test"
async def __aexit__(self, x, y, z) -> None:
await async_val("exit")
async def async_with() -> str:
async with async_ctx() as x:
return await async_val("body")
[file driver.py]
from native import async_with
from testutil import run_generator
yields, val = run_generator(async_with(), [None, 'x', None])
assert yields == ('enter', 'body', 'exit'), yields
assert val == 'x', val
[case testAsyncReturn]
from testutil import async_val
async def async_return() -> str:
try:
return 'test'
finally:
await async_val('foo')
[file driver.py]
from native import async_return
from testutil import run_generator
yields, val = run_generator(async_return())
assert yields == ('foo',)
assert val == 'test', val
[case testAsyncFor]
from typing import AsyncIterable, List, Set, Dict
async def async_iter(xs: AsyncIterable[int]) -> List[int]:
ys = []
async for x in xs:
ys.append(x)
return ys
async def async_comp(xs: AsyncIterable[int]) -> List[int]:
ys = [x async for x in xs]
return ys
async def async_comp_set(xs: AsyncIterable[int]) -> Set[int]:
return {x async for x in xs}
async def async_comp_dict(xs: AsyncIterable[int]) -> Dict[int, str]:
return {x: str(x) async for x in xs}
[typing fixtures/typing-full.pyi]
[file driver.py]
from native import async_iter, async_comp, async_comp_set, async_comp_dict
from testutil import run_generator, async_val
from typing import AsyncIterable, List
# defined here since we couldn't do it inside the test yet...
async def foo() -> AsyncIterable[int]:
for x in range(3):
await async_val(x)
yield x
yields, val = run_generator(async_iter(foo()))
assert val == [0,1,2], val
assert yields == (0,1,2), yields
yields, val = run_generator(async_comp(foo()))
assert val == [0,1,2], val
assert yields == (0,1,2), yields
yields, val = run_generator(async_comp_set(foo()))
assert val == {0,1,2}, val
assert yields == (0,1,2), yields
yields, val = run_generator(async_comp_dict(foo()))
assert val == {0: '0',1: '1', 2: '2'}, val
assert yields == (0,1,2), yields
[case testAsyncFor2]
from typing import AsyncIterable, List
async def async_iter(xs: AsyncIterable[int]) -> List[int]:
ys = []
async for x in xs:
ys.append(x)
return ys
[typing fixtures/typing-full.pyi]
[file driver.py]
from native import async_iter
from testutil import run_generator, async_val
from typing import AsyncIterable, List
# defined here since we couldn't do it inside the test yet...
async def foo() -> AsyncIterable[int]:
for x in range(3):
await async_val(x)
yield x
raise Exception('lol no')
yields, val = run_generator(async_iter(foo()))
assert yields == (0,1,2), yields
assert val == 'lol no', val
[case testAsyncWithVarReuse]
class ConMan:
async def __aenter__(self) -> int:
return 1
async def __aexit__(self, *exc: object):
pass
class ConManB:
async def __aenter__(self) -> int:
return 2
async def __aexit__(self, *exc: object):
pass
async def test_x() -> None:
value = 2
async with ConMan() as f:
value += f
assert value == 3, value
async with ConManB() as f:
value += f
assert value == 5, value
[case testRunAsyncSpecialCases]
import asyncio
async def t() -> tuple[int, str, str]:
return (1, "x", "y")
async def f() -> tuple[int, str, str]:
return await t()
async def test_tuple_return() -> None:
result = await f()
assert result == (1, "x", "y")
async def e() -> ValueError:
return ValueError("foo")
async def g() -> ValueError:
return await e()
async def test_exception_return() -> None:
result = await g()
assert isinstance(result, ValueError)
[file asyncio/__init__.pyi]
async def sleep(t: float) -> None: ...
[typing fixtures/typing-full.pyi]
[case testRunAsyncRefCounting]
import asyncio
import gc
async def assert_no_leaks(fn, max_new):
# Warm-up, in case asyncio allocates something on first use
await fn()
gc.collect()
old_objs = gc.get_objects()
for i in range(10):
await fn()
gc.collect()
new_objs = gc.get_objects()
delta = len(new_objs) - len(old_objs)
# Often a few persistent objects get allocated, which may be unavoidable.
# The main thing we care about is that each iteration does not leak an
# additional object.
assert delta <= max_new, delta
async def concat_one(x: str) -> str:
return x + "1"
async def foo(n: int) -> str:
s = ""
while len(s) < n:
s = await concat_one(s)
return s
async def test_trivial() -> None:
await assert_no_leaks(lambda: foo(1000), 5)
async def make_list(a: list[int]) -> list[int]:
await concat_one("foobar")
return [a[0]]
async def spill() -> list[int]:
a: list[int] = []
for i in range(5):
await asyncio.sleep(0.0001)
a = (await make_list(a + [1])) + a + (await make_list(a + [2]))
return a
async def bar(n: int) -> None:
for i in range(n):
await spill()
async def test_spilled() -> None:
await assert_no_leaks(lambda: bar(80), 2)
async def raise_deep(n: int) -> str:
if n == 0:
await asyncio.sleep(0.0001)
raise TypeError(str(n))
else:
if n == 2:
await asyncio.sleep(0.0001)
return await raise_deep(n - 1)
async def maybe_raise(n: int) -> str:
if n % 3 == 0:
await raise_deep(5)
elif n % 29 == 0:
await asyncio.sleep(0.0001)
return str(n)
async def exc(n: int) -> list[str]:
a = []
for i in range(n):
try:
a.append(str(int()) + await maybe_raise(n))
except TypeError:
a.append(str(int() + 5))
return a
async def test_exception() -> None:
await assert_no_leaks(lambda: exc(50), 2)
class C:
def __init__(self, s: str) -> None:
self.s = s
async def id(c: C) -> C:
return c
async def stolen_helper(c: C, s: str) -> str:
await asyncio.sleep(0.0001)
(await id(c)).s = await concat_one(s)
await asyncio.sleep(0.0001)
return c.s
async def stolen(n: int) -> int:
for i in range(n):
c = C(str(i))
s = await stolen_helper(c, str(i + 2))
assert s == str(i + 2) + "1"
return n
async def test_stolen() -> None:
await assert_no_leaks(lambda: stolen(200), 2)
[file asyncio/__init__.pyi]
async def sleep(t: float) -> None: ...
[case testRunAsyncMiscTypesInEnvironment]
# Here we test that values of various kinds of types can be spilled to the
# environment. In particular, types with "overlapping error values" such as
# i64 can be tricky, since they require extra work to support undefined
# attribute values (which raise AttributeError when accessed). For these,
# the object struct has a bitfield which keeps track of whether certain
# attributes have an assigned value.
#
# In practice we mark these attributes as "always defined", which causes these
# checks to be skipped on attribute access, and thus we don't require the
# bitfield to exist.
#
# See the comment of RType.error_overlap for more information.
import asyncio
from mypy_extensions import i64, i32, i16, u8
async def inc_float(x: float) -> float:
return x + 1.0
async def inc_i64(x: i64) -> i64:
return x + 1
async def inc_i32(x: i32) -> i32:
return x + 1
async def inc_i16(x: i16) -> i16:
return x + 1
async def inc_u8(x: u8) -> u8:
return x + 1
async def inc_tuple(x: tuple[i64, float]) -> tuple[i64, float]:
return x[0] + 1, x[1] + 1.5
async def neg_bool(b: bool) -> bool:
return not b
async def float_ops(x: float) -> float:
n = x
n = await inc_float(n)
n = float("0.5") + await inc_float(n)
return n
async def test_float() -> None:
assert await float_ops(2.5) == 5.0
async def i64_ops(x: i64) -> i64:
n = x
n = await inc_i64(n)
n = i64("1") + await inc_i64(n)
return n
async def test_i64() -> None:
assert await i64_ops(2) == 5
async def i32_ops(x: i32) -> i32:
n = x
n = await inc_i32(n)
n = i32("1") + await inc_i32(n)
return n
async def test_i32() -> None:
assert await i32_ops(3) == 6
async def i16_ops(x: i16) -> i16:
n = x
n = await inc_i16(n)
n = i16("1") + await inc_i16(n)
return n
async def test_i16() -> None:
assert await i16_ops(4) == 7
async def u8_ops(x: u8) -> u8:
n = x
n = await inc_u8(n)
n = u8("1") + await inc_u8(n)
return n
async def test_u8() -> None:
assert await u8_ops(5) == 8
async def tuple_ops(x: tuple[i64, float]) -> tuple[i64, float]:
n = x
n = await inc_tuple(n)
m = ((i64("1"), float("0.5")), await inc_tuple(n))
return m[1]
async def test_tuple() -> None:
assert await tuple_ops((1, 2.5)) == (3, 5.5)
async def bool_ops(x: bool) -> bool:
n = x
n = await neg_bool(n)
m = (bool("1"), await neg_bool(n))
return m[0] and m[1]
async def test_bool() -> None:
assert await bool_ops(True) is True
assert await bool_ops(False) is False
[file asyncio/__init__.pyi]
def run(x: object) -> object: ...
[case testRunAsyncNestedFunctions]
from __future__ import annotations
import asyncio
from typing import cast, Iterator, overload, Awaitable, Any, TypeVar
from testutil import assertRaises
def normal_contains_async_def(x: int) -> int:
async def f(y: int) -> int:
return x + y
return 5 + cast(int, asyncio.run(f(6)))
def test_def_contains_async_def() -> None:
assert normal_contains_async_def(3) == 14
async def inc(x: int) -> int:
return x + 1
async def async_def_contains_normal(x: int) -> int:
def nested(y: int, z: int) -> int:
return x + y + z
a = x
a += nested((await inc(3)), (await inc(4)))
return a
async def test_async_def_contains_normal() -> None:
assert await async_def_contains_normal(2) == (2 + 2 + 4 + 5)
async def async_def_contains_async_def(x: int) -> int:
async def f(y: int) -> int:
return (await inc(x)) + (await inc(y))
return (await f(1)) + (await f(2))
async def test_async_def_contains_async_def() -> None:
assert await async_def_contains_async_def(3) == (3 + 1 + 1 + 1) + (3 + 1 + 2 + 1)
async def async_def_contains_generator(x: int) -> tuple[int, int, int]:
def gen(y: int) -> Iterator[int]:
yield x + 1
yield x + y
it = gen(4)
res = x + 10, next(it), next(it)
with assertRaises(StopIteration):
next(it)
return res
async def test_async_def_contains_generator() -> None:
assert await async_def_contains_generator(3) == (13, 4, 7)
def generator_contains_async_def(x: int) -> Iterator[int]:
async def f(y: int) -> int:
return (await inc(x)) + (await inc(y))
yield cast(int, asyncio.run(f(2)))
yield cast(int, asyncio.run(f(3)))
yield x + 10
def test_generator_contains_async_def() -> None:
assert list(generator_contains_async_def(5)) == [6 + 3, 6 + 4, 15]
async def async_def_contains_two_nested_functions(x: int, y: int) -> tuple[int, int]:
def f(a: int) -> int:
return x + a
def g(b: int, c: int) -> int:
return y + b + c
return (await inc(f(3))), (await inc(g(4, 10)))
async def test_async_def_contains_two_nested_functions() -> None:
assert await async_def_contains_two_nested_functions(5, 7) == (
(5 + 3 + 1), (7 + 4 + 10 + 1)
)
async def async_def_contains_overloaded_async_def(n: int) -> int:
@overload
async def f(x: int) -> int: ...
@overload
async def f(x: str) -> str: ...
async def f(x: int | str) -> Any:
return x
return (await f(n)) + 1
async def test_async_def_contains_overloaded_async_def() -> None:
assert await async_def_contains_overloaded_async_def(5) == 6
T = TypeVar("T")
def deco(f: T) -> T:
return f
async def async_def_contains_decorated_async_def(n: int) -> int:
@deco
async def f(x: int) -> int:
return x + 2
return (await f(n)) + 1
async def test_async_def_contains_decorated_async_def() -> None:
assert await async_def_contains_decorated_async_def(7) == 10
[file asyncio/__init__.pyi]
def run(x: object) -> object: ...
[case testAsyncTryFinallyMixedReturn]
# This used to raise an AttributeError, when:
# - the try block contains multiple paths
# - at least one of those explicitly returns
# - at least one of those does not explicitly return
# - the non-returning path is taken at runtime
async def mixed_return(b: bool) -> bool:
try:
if b:
return b
finally:
pass
return b
async def test_async_try_finally_mixed_return() -> None:
# Test return path
result1 = await mixed_return(True)
assert result1 == True
# Test non-return path
result2 = await mixed_return(False)
assert result2 == False
[case testAsyncWithMixedReturn]
# This used to raise an AttributeError, related to
# testAsyncTryFinallyMixedReturn, this is essentially
# a far more extensive version of that test surfacing
# more edge cases
from typing import Optional, Type, Literal
class AsyncContextManager:
async def __aenter__(self) -> "AsyncContextManager":
return self
async def __aexit__(
self,
t: Optional[Type[BaseException]],
v: Optional[BaseException],
tb: object,
) -> Literal[False]:
return False
# Simple async functions (generator class)
async def gen_1(b: bool) -> bool:
async with AsyncContextManager():
if b:
return b
return b
async def gen_2(b: bool) -> bool:
async with AsyncContextManager():
if b:
return b
else:
return b
async def gen_3(b: bool) -> bool:
async with AsyncContextManager():
if b:
return b
else:
pass
return b
async def gen_4(b: bool) -> bool:
ret: bool
async with AsyncContextManager():
if b:
ret = b
else:
ret = b
return ret
async def gen_5(i: int) -> int:
async with AsyncContextManager():
if i == 1:
return i
elif i == 2:
pass
elif i == 3:
return i
return i
async def gen_6(i: int) -> int:
async with AsyncContextManager():
if i == 1:
return i
elif i == 2:
return i
elif i == 3:
return i
return i
async def gen_7(i: int) -> int:
async with AsyncContextManager():
if i == 1:
return i
elif i == 2:
return i
elif i == 3:
return i
else:
return i
# Async functions with nested functions (environment class)
async def env_1(b: bool) -> bool:
def helper() -> bool:
return True
async with AsyncContextManager():
if b:
return helper()
return b
async def env_2(b: bool) -> bool:
def helper() -> bool:
return True
async with AsyncContextManager():
if b:
return helper()
else:
return b
async def env_3(b: bool) -> bool:
def helper() -> bool:
return True
async with AsyncContextManager():
if b:
return helper()
else:
pass
return b
async def env_4(b: bool) -> bool:
def helper() -> bool:
return True
ret: bool
async with AsyncContextManager():
if b:
ret = helper()
else:
ret = b
return ret
async def env_5(i: int) -> int:
def helper() -> int:
return 1
async with AsyncContextManager():
if i == 1:
return helper()
elif i == 2:
pass
elif i == 3:
return i
return i
async def env_6(i: int) -> int:
def helper() -> int:
return 1
async with AsyncContextManager():
if i == 1:
return helper()
elif i == 2:
return i
elif i == 3:
return i
return i
async def env_7(i: int) -> int:
def helper() -> int:
return 1
async with AsyncContextManager():
if i == 1:
return helper()
elif i == 2:
return i
elif i == 3:
return i
else:
return i
async def test_async_with_mixed_return() -> None:
# Test simple async functions (generator class)
# env_1: mixed return/no-return
assert await gen_1(True) is True
assert await gen_1(False) is False
# gen_2: all branches return
assert await gen_2(True) is True
assert await gen_2(False) is False
# gen_3: mixed return/pass
assert await gen_3(True) is True
assert await gen_3(False) is False
# gen_4: no returns in async with
assert await gen_4(True) is True
assert await gen_4(False) is False
# gen_5: multiple branches, some return
assert await gen_5(0) == 0
assert await gen_5(1) == 1
assert await gen_5(2) == 2
assert await gen_5(3) == 3
# gen_6: all explicit branches return, implicit fallthrough
assert await gen_6(0) == 0
assert await gen_6(1) == 1
assert await gen_6(2) == 2
assert await gen_6(3) == 3
# gen_7: all branches return including else
assert await gen_7(0) == 0
assert await gen_7(1) == 1
assert await gen_7(2) == 2
assert await gen_7(3) == 3
# Test async functions with nested functions (environment class)
# env_1: mixed return/no-return
assert await env_1(True) is True
assert await env_1(False) is False
# env_2: all branches return
assert await env_2(True) is True
assert await env_2(False) is False
# env_3: mixed return/pass
assert await env_3(True) is True
assert await env_3(False) is False
# env_4: no returns in async with
assert await env_4(True) is True
assert await env_4(False) is False
# env_5: multiple branches, some return
assert await env_5(0) == 0
assert await env_5(1) == 1
assert await env_5(2) == 2
assert await env_5(3) == 3
# env_6: all explicit branches return, implicit fallthrough
assert await env_6(0) == 0
assert await env_6(1) == 1
assert await env_6(2) == 2
assert await env_6(3) == 3
# env_7: all branches return including else
assert await env_7(0) == 0
assert await env_7(1) == 1
assert await env_7(2) == 2
assert await env_7(3) == 3
[case testAsyncTryExceptFinallyAwait]
import asyncio
from testutil import assertRaises
class TestError(Exception):
pass
# Test 0: Simplest case - just try/finally with raise and await
async def simple_try_finally_await() -> None:
try:
raise ValueError("simple error")
finally:
await asyncio.sleep(0)
# Test 1: Raise inside try, catch in except, don't re-raise
async def async_try_except_no_reraise() -> int:
try:
raise ValueError("test error")
return 1 # Never reached
except ValueError:
return 2 # Should return this
finally:
await asyncio.sleep(0)
return 3 # Should not reach this
# Test 2: Raise inside try, catch in except, re-raise
async def async_try_except_reraise() -> int:
try:
raise ValueError("test error")
return 1 # Never reached
except ValueError:
raise # Re-raise the exception
finally:
await asyncio.sleep(0)
return 2 # Should not reach this
# Test 3: Raise inside try, catch in except, raise different error
async def async_try_except_raise_different() -> int:
try:
raise ValueError("original error")
return 1 # Never reached
except ValueError:
raise RuntimeError("different error")
finally:
await asyncio.sleep(0)
return 2 # Should not reach this
# Test 4: Another try/except block inside finally
async def async_try_except_inside_finally() -> int:
try:
raise ValueError("outer error")
return 1 # Never reached
finally:
await asyncio.sleep(0)
try:
raise RuntimeError("inner error")
except RuntimeError:
pass # Catch inner error
return 2 # What happens after finally with inner exception handled?
# Test 5: Another try/finally block inside finally
async def async_try_finally_inside_finally() -> int:
try:
raise ValueError("outer error")
return 1 # Never reached
finally:
await asyncio.sleep(0)
try:
raise RuntimeError("inner error")
finally:
await asyncio.sleep(0)
return 2 # Should not reach this
# Control case: No await in finally - should work correctly
async def async_exception_no_await_in_finally() -> None:
"""Control case: This works correctly - exception propagates"""
try:
raise TestError("This exception will propagate!")
finally:
pass # No await here
# Test function with no exception to check normal flow
async def async_no_exception_with_await_in_finally() -> int:
try:
return 1 # Normal return
finally:
await asyncio.sleep(0)
return 2 # Should not reach this
async def test_async_try_except_finally_await() -> None:
# Test 0: Simplest case - just try/finally with exception
# Expected: ValueError propagates
with assertRaises(ValueError):
await simple_try_finally_await()
# Test 1: Exception caught, not re-raised
# Expected: return 2 (from except block)
result = await async_try_except_no_reraise()
assert result == 2, f"Expected 2, got {result}"
# Test 2: Exception caught and re-raised
# Expected: ValueError propagates
with assertRaises(ValueError):
await async_try_except_reraise()
# Test 3: Exception caught, different exception raised
# Expected: RuntimeError propagates
with assertRaises(RuntimeError):
await async_try_except_raise_different()
# Test 4: Try/except inside finally
# Expected: ValueError propagates (outer exception)
with assertRaises(ValueError):
await async_try_except_inside_finally()
# Test 5: Try/finally inside finally
# Expected: RuntimeError propagates (inner error)
with assertRaises(RuntimeError):
await async_try_finally_inside_finally()
# Control case: No await in finally (should work correctly)
with assertRaises(TestError):
await async_exception_no_await_in_finally()
# Test normal flow (no exception)
# Expected: return 1
result = await async_no_exception_with_await_in_finally()
assert result == 1, f"Expected 1, got {result}"
[file asyncio/__init__.pyi]
async def sleep(t: float) -> None: ...
[case testAsyncContextManagerExceptionHandling]
import asyncio
from typing import Optional, Type
from testutil import assertRaises
# Test 1: Basic async context manager that doesn't suppress exceptions
class AsyncContextManager:
async def __aenter__(self) -> 'AsyncContextManager':
return self
async def __aexit__(self, exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: object) -> None:
# This await in __aexit__ is like await in finally
await asyncio.sleep(0)
# Don't suppress the exception (return None/False)
async def func_with_async_context_manager() -> str:
async with AsyncContextManager():
raise ValueError("Exception inside async with")
return "should not reach" # Never reached
return "should not reach either" # Never reached
async def test_basic_exception() -> str:
try:
await func_with_async_context_manager()
return "func_a returned normally - bug!"
except ValueError:
return "caught ValueError - correct!"
except Exception as e:
return f"caught different exception: {type(e).__name__}"
# Test 2: Async context manager that raises a different exception in __aexit__
class AsyncContextManagerRaisesInExit:
async def __aenter__(self) -> 'AsyncContextManagerRaisesInExit':
return self
async def __aexit__(self, exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: object) -> None:
# This await in __aexit__ is like await in finally
await asyncio.sleep(0)
# Raise a different exception - this should replace the original exception
raise RuntimeError("Exception in __aexit__")
async def func_with_raising_context_manager() -> str:
async with AsyncContextManagerRaisesInExit():
raise ValueError("Original exception")
return "should not reach" # Never reached
return "should not reach either" # Never reached
async def test_exception_in_aexit() -> str:
try:
await func_with_raising_context_manager()
return "func returned normally - unexpected!"
except RuntimeError:
return "caught RuntimeError - correct!"
except ValueError:
return "caught ValueError - original exception not replaced!"
except Exception as e:
return f"caught different exception: {type(e).__name__}"
async def test_async_context_manager_exception_handling() -> None:
# Test 1: Basic exception propagation
result = await test_basic_exception()
# Expected: "caught ValueError - correct!"
assert result == "caught ValueError - correct!", f"Expected exception to propagate, got: {result}"
# Test 2: Exception raised in __aexit__ replaces original exception
result = await test_exception_in_aexit()
# Expected: "caught RuntimeError - correct!"
# (The RuntimeError from __aexit__ should replace the ValueError)
assert result == "caught RuntimeError - correct!", f"Expected RuntimeError from __aexit__, got: {result}"
[file asyncio/__init__.pyi]
async def sleep(t: float) -> None: ...
[case testCallableArgWithSameNameAsHelperMethod]
import asyncio
from typing import Awaitable, Callable
MyCallable = Callable[[int, int], Awaitable[int]]
async def add(a: int, b: int) -> int:
return a + b
async def await_send(send: MyCallable) -> int:
return await send(1, 2)
async def await_throw(throw: MyCallable) -> int:
return await throw(3, 4)
async def tests() -> None:
assert await await_send(add) == 3
assert await await_throw(add) == 7
def test_callable_arg_same_name_as_helper() -> None:
asyncio.run(tests())
[file asyncio/__init__.pyi]
def run(x: object) -> object: ...
[case testRunAsyncCancelFinallySpecialCase]
import asyncio
from testutil import assertRaises
# Greatly simplified from asyncio.Condition
class Condition:
async def acquire(self) -> None: pass
async def wait(self) -> bool:
l = asyncio.get_running_loop()
fut = l.create_future()
a = []
try:
try:
a.append(fut)
try:
await fut
return True
finally:
a.pop()
finally:
err = None
while True:
try:
await self.acquire()
break
except asyncio.CancelledError as e:
err = e
if err is not None:
try:
raise err
finally:
err = None
except BaseException:
raise
async def do_cancel() -> None:
cond = Condition()
wait = asyncio.create_task(cond.wait())
asyncio.get_running_loop().call_soon(wait.cancel)
with assertRaises(asyncio.CancelledError):
await wait
def test_cancel_special_case() -> None:
asyncio.run(do_cancel())
[file asyncio/__init__.pyi]
from typing import Any
class CancelledError(Exception): ...
def run(x: object) -> object: ...
def get_running_loop() -> Any: ...
def create_task(x: object) -> Any: ...