| # async test cases (compile and run) |
| |
| [case testRunAsyncBasics] |
| import asyncio |
| from typing import Callable, Awaitable |
| |
| from testutil import assertRaises |
| |
| async def h() -> int: |
| return 1 |
| |
| async def g() -> int: |
| await asyncio.sleep(0) |
| return await h() |
| |
| async def f() -> int: |
| return await g() + 2 |
| |
| async def f2() -> int: |
| x = 0 |
| for i in range(2): |
| x += i + await f() + await g() |
| return x |
| |
| async def test_simple_call() -> None: |
| result = await f() |
| assert result == 3 |
| |
| async def test_multiple_awaits_in_expression() -> None: |
| result = await f2() |
| assert result == 9 |
| |
| class MyError(Exception): |
| pass |
| |
| async def exc1() -> None: |
| await asyncio.sleep(0) |
| raise MyError() |
| |
| async def exc2() -> None: |
| await asyncio.sleep(0) |
| raise MyError() |
| |
| async def exc3() -> None: |
| await exc1() |
| |
| async def exc4() -> None: |
| await exc2() |
| |
| async def exc5() -> int: |
| try: |
| await exc1() |
| except MyError: |
| return 3 |
| return 4 |
| |
| async def exc6() -> int: |
| try: |
| await exc4() |
| except MyError: |
| return 3 |
| return 4 |
| |
| async def test_exception() -> None: |
| with assertRaises(MyError): |
| await exc1() |
| with assertRaises(MyError): |
| await exc2() |
| with assertRaises(MyError): |
| await exc3() |
| with assertRaises(MyError): |
| await exc4() |
| assert await exc5() == 3 |
| assert await exc6() == 3 |
| |
| async def indirect_call(x: int, c: Callable[[int], Awaitable[int]]) -> int: |
| return await c(x) |
| |
| async def indirect_call_2(a: Awaitable[None]) -> None: |
| await a |
| |
| async def indirect_call_3(a: Awaitable[float]) -> float: |
| return (await a) + 1.0 |
| |
| async def inc(x: int) -> int: |
| await asyncio.sleep(0) |
| return x + 1 |
| |
| async def ident(x: float, err: bool = False) -> float: |
| await asyncio.sleep(0.0) |
| if err: |
| raise MyError() |
| return x + float("0.0") |
| |
| async def test_indirect_call() -> None: |
| assert await indirect_call(3, inc) == 4 |
| |
| with assertRaises(MyError): |
| await indirect_call_2(exc1()) |
| |
| assert await indirect_call_3(ident(2.0)) == 3.0 |
| assert await indirect_call_3(ident(-113.0)) == -112.0 |
| assert await indirect_call_3(ident(-114.0)) == -113.0 |
| |
| with assertRaises(MyError): |
| await indirect_call_3(ident(1.0, True)) |
| with assertRaises(MyError): |
| await indirect_call_3(ident(-113.0, True)) |
| |
| class C: |
| def __init__(self, n: int) -> None: |
| self.n = n |
| |
| async def add(self, x: int, err: bool = False) -> int: |
| await asyncio.sleep(0) |
| if err: |
| raise MyError() |
| return x + self.n |
| |
| async def method_call(x: int) -> int: |
| c = C(5) |
| return await c.add(x) |
| |
| async def method_call_exception() -> int: |
| c = C(5) |
| return await c.add(3, err=True) |
| |
| async def test_async_method_call() -> None: |
| assert await method_call(3) == 8 |
| with assertRaises(MyError): |
| await method_call_exception() |
| |
| [file asyncio/__init__.pyi] |
| async def sleep(t: float) -> None: ... |
| |
| [typing fixtures/typing-full.pyi] |
| |
| [case testRunAsyncAwaitInVariousPositions] |
| from typing import cast, Any |
| |
| import asyncio |
| |
| async def one() -> int: |
| await asyncio.sleep(0.0) |
| return int() + 1 |
| |
| async def true() -> bool: |
| return bool(int() + await one()) |
| |
| async def branch_await() -> int: |
| if bool(int() + 1) == await true(): |
| return 3 |
| return 2 |
| |
| async def branch_await_not() -> int: |
| if bool(int() + 1) == (not await true()): |
| return 3 |
| return 2 |
| |
| async def test_branch() -> None: |
| assert await branch_await() == 3 |
| assert await branch_await_not() == 2 |
| |
| async def assign_multi() -> int: |
| _, x = int(), await one() |
| return x + 1 |
| |
| async def test_assign_multi() -> None: |
| assert await assign_multi() == 2 |
| |
| class C: |
| def __init__(self, s: str) -> None: |
| self.s = s |
| |
| def concat(self, s: str) -> str: |
| return self.s + s |
| |
| async def make_c(s: str) -> C: |
| await one() |
| return C(s) |
| |
| async def concat(s: str, t: str) -> str: |
| await one() |
| return s + t |
| |
| async def set_attr(s: str) -> None: |
| (await make_c("xyz")).s = await concat(s, "!") |
| |
| async def test_set_attr() -> None: |
| await set_attr("foo") # Just check that it compiles and runs |
| |
| def concat2(x: str, y: str) -> str: |
| return x + y |
| |
| async def call1(s: str) -> str: |
| return concat2(str(int()), await concat(s, "a")) |
| |
| async def call2(s: str) -> str: |
| return await concat(str(int()), await concat(s, "b")) |
| |
| async def test_call() -> None: |
| assert await call1("foo") == "0fooa" |
| assert await call2("foo") == "0foob" |
| |
| async def method_call(s: str) -> str: |
| return C("<").concat(await concat(s, ">")) |
| |
| async def test_method_call() -> None: |
| assert await method_call("foo") == "<foo>" |
| |
| class D: |
| def __init__(self, a: str, b: str) -> None: |
| self.a = a |
| self.b = b |
| |
| async def construct(s: str) -> str: |
| c = D(await concat(s, "!"), await concat(s, "?")) |
| return c.a + c.b |
| |
| async def test_construct() -> None: |
| assert await construct("foo") == "foo!foo?" |
| |
| [file asyncio/__init__.pyi] |
| async def sleep(t: float) -> None: ... |
| |
| [typing fixtures/typing-full.pyi] |
| |
| |
| [case testAsyncWith] |
| from testutil import async_val |
| |
| class async_ctx: |
| async def __aenter__(self) -> str: |
| await async_val("enter") |
| return "test" |
| |
| async def __aexit__(self, x, y, z) -> None: |
| await async_val("exit") |
| |
| |
| async def async_with() -> str: |
| async with async_ctx() as x: |
| return await async_val("body") |
| |
| |
| [file driver.py] |
| from native import async_with |
| from testutil import run_generator |
| |
| yields, val = run_generator(async_with(), [None, 'x', None]) |
| assert yields == ('enter', 'body', 'exit'), yields |
| assert val == 'x', val |
| |
| |
| [case testAsyncReturn] |
| from testutil import async_val |
| |
| async def async_return() -> str: |
| try: |
| return 'test' |
| finally: |
| await async_val('foo') |
| |
| [file driver.py] |
| from native import async_return |
| from testutil import run_generator |
| |
| yields, val = run_generator(async_return()) |
| assert yields == ('foo',) |
| assert val == 'test', val |
| |
| [case testAsyncFor] |
| from typing import AsyncIterable, List, Set, Dict |
| |
| async def async_iter(xs: AsyncIterable[int]) -> List[int]: |
| ys = [] |
| async for x in xs: |
| ys.append(x) |
| return ys |
| |
| async def async_comp(xs: AsyncIterable[int]) -> List[int]: |
| ys = [x async for x in xs] |
| return ys |
| |
| async def async_comp_set(xs: AsyncIterable[int]) -> Set[int]: |
| return {x async for x in xs} |
| |
| async def async_comp_dict(xs: AsyncIterable[int]) -> Dict[int, str]: |
| return {x: str(x) async for x in xs} |
| |
| [typing fixtures/typing-full.pyi] |
| |
| [file driver.py] |
| from native import async_iter, async_comp, async_comp_set, async_comp_dict |
| from testutil import run_generator, async_val |
| from typing import AsyncIterable, List |
| |
| # defined here since we couldn't do it inside the test yet... |
| async def foo() -> AsyncIterable[int]: |
| for x in range(3): |
| await async_val(x) |
| yield x |
| |
| yields, val = run_generator(async_iter(foo())) |
| assert val == [0,1,2], val |
| assert yields == (0,1,2), yields |
| |
| yields, val = run_generator(async_comp(foo())) |
| assert val == [0,1,2], val |
| assert yields == (0,1,2), yields |
| |
| yields, val = run_generator(async_comp_set(foo())) |
| assert val == {0,1,2}, val |
| assert yields == (0,1,2), yields |
| |
| yields, val = run_generator(async_comp_dict(foo())) |
| assert val == {0: '0',1: '1', 2: '2'}, val |
| assert yields == (0,1,2), yields |
| |
| [case testAsyncFor2] |
| from typing import AsyncIterable, List |
| |
| async def async_iter(xs: AsyncIterable[int]) -> List[int]: |
| ys = [] |
| async for x in xs: |
| ys.append(x) |
| return ys |
| |
| [typing fixtures/typing-full.pyi] |
| |
| [file driver.py] |
| from native import async_iter |
| from testutil import run_generator, async_val |
| from typing import AsyncIterable, List |
| |
| # defined here since we couldn't do it inside the test yet... |
| async def foo() -> AsyncIterable[int]: |
| for x in range(3): |
| await async_val(x) |
| yield x |
| raise Exception('lol no') |
| |
| yields, val = run_generator(async_iter(foo())) |
| assert yields == (0,1,2), yields |
| assert val == 'lol no', val |
| |
| [case testAsyncWithVarReuse] |
| class ConMan: |
| async def __aenter__(self) -> int: |
| return 1 |
| async def __aexit__(self, *exc: object): |
| pass |
| |
| class ConManB: |
| async def __aenter__(self) -> int: |
| return 2 |
| async def __aexit__(self, *exc: object): |
| pass |
| |
| async def test_x() -> None: |
| value = 2 |
| async with ConMan() as f: |
| value += f |
| assert value == 3, value |
| async with ConManB() as f: |
| value += f |
| assert value == 5, value |
| |
| [case testRunAsyncSpecialCases] |
| import asyncio |
| |
| async def t() -> tuple[int, str, str]: |
| return (1, "x", "y") |
| |
| async def f() -> tuple[int, str, str]: |
| return await t() |
| |
| async def test_tuple_return() -> None: |
| result = await f() |
| assert result == (1, "x", "y") |
| |
| async def e() -> ValueError: |
| return ValueError("foo") |
| |
| async def g() -> ValueError: |
| return await e() |
| |
| async def test_exception_return() -> None: |
| result = await g() |
| assert isinstance(result, ValueError) |
| |
| [file asyncio/__init__.pyi] |
| async def sleep(t: float) -> None: ... |
| |
| [typing fixtures/typing-full.pyi] |
| |
| [case testRunAsyncRefCounting] |
| import asyncio |
| import gc |
| |
| async def assert_no_leaks(fn, max_new): |
| # Warm-up, in case asyncio allocates something on first use |
| await fn() |
| |
| gc.collect() |
| old_objs = gc.get_objects() |
| |
| for i in range(10): |
| await fn() |
| |
| gc.collect() |
| new_objs = gc.get_objects() |
| |
| delta = len(new_objs) - len(old_objs) |
| # Often a few persistent objects get allocated, which may be unavoidable. |
| # The main thing we care about is that each iteration does not leak an |
| # additional object. |
| assert delta <= max_new, delta |
| |
| async def concat_one(x: str) -> str: |
| return x + "1" |
| |
| async def foo(n: int) -> str: |
| s = "" |
| while len(s) < n: |
| s = await concat_one(s) |
| return s |
| |
| async def test_trivial() -> None: |
| await assert_no_leaks(lambda: foo(1000), 5) |
| |
| async def make_list(a: list[int]) -> list[int]: |
| await concat_one("foobar") |
| return [a[0]] |
| |
| async def spill() -> list[int]: |
| a: list[int] = [] |
| for i in range(5): |
| await asyncio.sleep(0.0001) |
| a = (await make_list(a + [1])) + a + (await make_list(a + [2])) |
| return a |
| |
| async def bar(n: int) -> None: |
| for i in range(n): |
| await spill() |
| |
| async def test_spilled() -> None: |
| await assert_no_leaks(lambda: bar(80), 2) |
| |
| async def raise_deep(n: int) -> str: |
| if n == 0: |
| await asyncio.sleep(0.0001) |
| raise TypeError(str(n)) |
| else: |
| if n == 2: |
| await asyncio.sleep(0.0001) |
| return await raise_deep(n - 1) |
| |
| async def maybe_raise(n: int) -> str: |
| if n % 3 == 0: |
| await raise_deep(5) |
| elif n % 29 == 0: |
| await asyncio.sleep(0.0001) |
| return str(n) |
| |
| async def exc(n: int) -> list[str]: |
| a = [] |
| for i in range(n): |
| try: |
| a.append(str(int()) + await maybe_raise(n)) |
| except TypeError: |
| a.append(str(int() + 5)) |
| return a |
| |
| async def test_exception() -> None: |
| await assert_no_leaks(lambda: exc(50), 2) |
| |
| class C: |
| def __init__(self, s: str) -> None: |
| self.s = s |
| |
| async def id(c: C) -> C: |
| return c |
| |
| async def stolen_helper(c: C, s: str) -> str: |
| await asyncio.sleep(0.0001) |
| (await id(c)).s = await concat_one(s) |
| await asyncio.sleep(0.0001) |
| return c.s |
| |
| async def stolen(n: int) -> int: |
| for i in range(n): |
| c = C(str(i)) |
| s = await stolen_helper(c, str(i + 2)) |
| assert s == str(i + 2) + "1" |
| return n |
| |
| async def test_stolen() -> None: |
| await assert_no_leaks(lambda: stolen(200), 2) |
| |
| [file asyncio/__init__.pyi] |
| async def sleep(t: float) -> None: ... |
| |
| [case testRunAsyncMiscTypesInEnvironment] |
| # Here we test that values of various kinds of types can be spilled to the |
| # environment. In particular, types with "overlapping error values" such as |
| # i64 can be tricky, since they require extra work to support undefined |
| # attribute values (which raise AttributeError when accessed). For these, |
| # the object struct has a bitfield which keeps track of whether certain |
| # attributes have an assigned value. |
| # |
| # In practice we mark these attributes as "always defined", which causes these |
| # checks to be skipped on attribute access, and thus we don't require the |
| # bitfield to exist. |
| # |
| # See the comment of RType.error_overlap for more information. |
| |
| import asyncio |
| |
| from mypy_extensions import i64, i32, i16, u8 |
| |
| async def inc_float(x: float) -> float: |
| return x + 1.0 |
| |
| async def inc_i64(x: i64) -> i64: |
| return x + 1 |
| |
| async def inc_i32(x: i32) -> i32: |
| return x + 1 |
| |
| async def inc_i16(x: i16) -> i16: |
| return x + 1 |
| |
| async def inc_u8(x: u8) -> u8: |
| return x + 1 |
| |
| async def inc_tuple(x: tuple[i64, float]) -> tuple[i64, float]: |
| return x[0] + 1, x[1] + 1.5 |
| |
| async def neg_bool(b: bool) -> bool: |
| return not b |
| |
| async def float_ops(x: float) -> float: |
| n = x |
| n = await inc_float(n) |
| n = float("0.5") + await inc_float(n) |
| return n |
| |
| async def test_float() -> None: |
| assert await float_ops(2.5) == 5.0 |
| |
| async def i64_ops(x: i64) -> i64: |
| n = x |
| n = await inc_i64(n) |
| n = i64("1") + await inc_i64(n) |
| return n |
| |
| async def test_i64() -> None: |
| assert await i64_ops(2) == 5 |
| |
| async def i32_ops(x: i32) -> i32: |
| n = x |
| n = await inc_i32(n) |
| n = i32("1") + await inc_i32(n) |
| return n |
| |
| async def test_i32() -> None: |
| assert await i32_ops(3) == 6 |
| |
| async def i16_ops(x: i16) -> i16: |
| n = x |
| n = await inc_i16(n) |
| n = i16("1") + await inc_i16(n) |
| return n |
| |
| async def test_i16() -> None: |
| assert await i16_ops(4) == 7 |
| |
| async def u8_ops(x: u8) -> u8: |
| n = x |
| n = await inc_u8(n) |
| n = u8("1") + await inc_u8(n) |
| return n |
| |
| async def test_u8() -> None: |
| assert await u8_ops(5) == 8 |
| |
| async def tuple_ops(x: tuple[i64, float]) -> tuple[i64, float]: |
| n = x |
| n = await inc_tuple(n) |
| m = ((i64("1"), float("0.5")), await inc_tuple(n)) |
| return m[1] |
| |
| async def test_tuple() -> None: |
| assert await tuple_ops((1, 2.5)) == (3, 5.5) |
| |
| async def bool_ops(x: bool) -> bool: |
| n = x |
| n = await neg_bool(n) |
| m = (bool("1"), await neg_bool(n)) |
| return m[0] and m[1] |
| |
| async def test_bool() -> None: |
| assert await bool_ops(True) is True |
| assert await bool_ops(False) is False |
| |
| [file asyncio/__init__.pyi] |
| def run(x: object) -> object: ... |
| |
| [case testRunAsyncNestedFunctions] |
| from __future__ import annotations |
| |
| import asyncio |
| from typing import cast, Iterator, overload, Awaitable, Any, TypeVar |
| |
| from testutil import assertRaises |
| |
| def normal_contains_async_def(x: int) -> int: |
| async def f(y: int) -> int: |
| return x + y |
| |
| return 5 + cast(int, asyncio.run(f(6))) |
| |
| def test_def_contains_async_def() -> None: |
| assert normal_contains_async_def(3) == 14 |
| |
| async def inc(x: int) -> int: |
| return x + 1 |
| |
| async def async_def_contains_normal(x: int) -> int: |
| def nested(y: int, z: int) -> int: |
| return x + y + z |
| |
| a = x |
| a += nested((await inc(3)), (await inc(4))) |
| return a |
| |
| async def test_async_def_contains_normal() -> None: |
| assert await async_def_contains_normal(2) == (2 + 2 + 4 + 5) |
| |
| async def async_def_contains_async_def(x: int) -> int: |
| async def f(y: int) -> int: |
| return (await inc(x)) + (await inc(y)) |
| |
| return (await f(1)) + (await f(2)) |
| |
| async def test_async_def_contains_async_def() -> None: |
| assert await async_def_contains_async_def(3) == (3 + 1 + 1 + 1) + (3 + 1 + 2 + 1) |
| |
| async def async_def_contains_generator(x: int) -> tuple[int, int, int]: |
| def gen(y: int) -> Iterator[int]: |
| yield x + 1 |
| yield x + y |
| |
| it = gen(4) |
| res = x + 10, next(it), next(it) |
| |
| with assertRaises(StopIteration): |
| next(it) |
| |
| return res |
| |
| async def test_async_def_contains_generator() -> None: |
| assert await async_def_contains_generator(3) == (13, 4, 7) |
| |
| def generator_contains_async_def(x: int) -> Iterator[int]: |
| async def f(y: int) -> int: |
| return (await inc(x)) + (await inc(y)) |
| |
| yield cast(int, asyncio.run(f(2))) |
| yield cast(int, asyncio.run(f(3))) |
| yield x + 10 |
| |
| def test_generator_contains_async_def() -> None: |
| assert list(generator_contains_async_def(5)) == [6 + 3, 6 + 4, 15] |
| |
| async def async_def_contains_two_nested_functions(x: int, y: int) -> tuple[int, int]: |
| def f(a: int) -> int: |
| return x + a |
| |
| def g(b: int, c: int) -> int: |
| return y + b + c |
| |
| return (await inc(f(3))), (await inc(g(4, 10))) |
| |
| async def test_async_def_contains_two_nested_functions() -> None: |
| assert await async_def_contains_two_nested_functions(5, 7) == ( |
| (5 + 3 + 1), (7 + 4 + 10 + 1) |
| ) |
| |
| async def async_def_contains_overloaded_async_def(n: int) -> int: |
| @overload |
| async def f(x: int) -> int: ... |
| |
| @overload |
| async def f(x: str) -> str: ... |
| |
| async def f(x: int | str) -> Any: |
| return x |
| |
| return (await f(n)) + 1 |
| |
| |
| async def test_async_def_contains_overloaded_async_def() -> None: |
| assert await async_def_contains_overloaded_async_def(5) == 6 |
| |
| T = TypeVar("T") |
| |
| def deco(f: T) -> T: |
| return f |
| |
| async def async_def_contains_decorated_async_def(n: int) -> int: |
| @deco |
| async def f(x: int) -> int: |
| return x + 2 |
| |
| return (await f(n)) + 1 |
| |
| |
| async def test_async_def_contains_decorated_async_def() -> None: |
| assert await async_def_contains_decorated_async_def(7) == 10 |
| |
| [file asyncio/__init__.pyi] |
| def run(x: object) -> object: ... |
| |
| [case testAsyncTryFinallyMixedReturn] |
| # This used to raise an AttributeError, when: |
| # - the try block contains multiple paths |
| # - at least one of those explicitly returns |
| # - at least one of those does not explicitly return |
| # - the non-returning path is taken at runtime |
| |
| async def mixed_return(b: bool) -> bool: |
| try: |
| if b: |
| return b |
| finally: |
| pass |
| return b |
| |
| |
| async def test_async_try_finally_mixed_return() -> None: |
| # Test return path |
| result1 = await mixed_return(True) |
| assert result1 == True |
| |
| # Test non-return path |
| result2 = await mixed_return(False) |
| assert result2 == False |
| |
| [case testAsyncWithMixedReturn] |
| # This used to raise an AttributeError, related to |
| # testAsyncTryFinallyMixedReturn, this is essentially |
| # a far more extensive version of that test surfacing |
| # more edge cases |
| |
| from typing import Optional, Type, Literal |
| |
| |
| class AsyncContextManager: |
| async def __aenter__(self) -> "AsyncContextManager": |
| return self |
| |
| async def __aexit__( |
| self, |
| t: Optional[Type[BaseException]], |
| v: Optional[BaseException], |
| tb: object, |
| ) -> Literal[False]: |
| return False |
| |
| |
| # Simple async functions (generator class) |
| async def gen_1(b: bool) -> bool: |
| async with AsyncContextManager(): |
| if b: |
| return b |
| return b |
| |
| |
| async def gen_2(b: bool) -> bool: |
| async with AsyncContextManager(): |
| if b: |
| return b |
| else: |
| return b |
| |
| |
| async def gen_3(b: bool) -> bool: |
| async with AsyncContextManager(): |
| if b: |
| return b |
| else: |
| pass |
| return b |
| |
| |
| async def gen_4(b: bool) -> bool: |
| ret: bool |
| async with AsyncContextManager(): |
| if b: |
| ret = b |
| else: |
| ret = b |
| return ret |
| |
| |
| async def gen_5(i: int) -> int: |
| async with AsyncContextManager(): |
| if i == 1: |
| return i |
| elif i == 2: |
| pass |
| elif i == 3: |
| return i |
| return i |
| |
| |
| async def gen_6(i: int) -> int: |
| async with AsyncContextManager(): |
| if i == 1: |
| return i |
| elif i == 2: |
| return i |
| elif i == 3: |
| return i |
| return i |
| |
| |
| async def gen_7(i: int) -> int: |
| async with AsyncContextManager(): |
| if i == 1: |
| return i |
| elif i == 2: |
| return i |
| elif i == 3: |
| return i |
| else: |
| return i |
| |
| |
| # Async functions with nested functions (environment class) |
| async def env_1(b: bool) -> bool: |
| def helper() -> bool: |
| return True |
| |
| async with AsyncContextManager(): |
| if b: |
| return helper() |
| return b |
| |
| |
| async def env_2(b: bool) -> bool: |
| def helper() -> bool: |
| return True |
| |
| async with AsyncContextManager(): |
| if b: |
| return helper() |
| else: |
| return b |
| |
| |
| async def env_3(b: bool) -> bool: |
| def helper() -> bool: |
| return True |
| |
| async with AsyncContextManager(): |
| if b: |
| return helper() |
| else: |
| pass |
| return b |
| |
| |
| async def env_4(b: bool) -> bool: |
| def helper() -> bool: |
| return True |
| |
| ret: bool |
| async with AsyncContextManager(): |
| if b: |
| ret = helper() |
| else: |
| ret = b |
| return ret |
| |
| |
| async def env_5(i: int) -> int: |
| def helper() -> int: |
| return 1 |
| |
| async with AsyncContextManager(): |
| if i == 1: |
| return helper() |
| elif i == 2: |
| pass |
| elif i == 3: |
| return i |
| return i |
| |
| |
| async def env_6(i: int) -> int: |
| def helper() -> int: |
| return 1 |
| |
| async with AsyncContextManager(): |
| if i == 1: |
| return helper() |
| elif i == 2: |
| return i |
| elif i == 3: |
| return i |
| return i |
| |
| |
| async def env_7(i: int) -> int: |
| def helper() -> int: |
| return 1 |
| |
| async with AsyncContextManager(): |
| if i == 1: |
| return helper() |
| elif i == 2: |
| return i |
| elif i == 3: |
| return i |
| else: |
| return i |
| |
| |
| async def test_async_with_mixed_return() -> None: |
| # Test simple async functions (generator class) |
| # env_1: mixed return/no-return |
| assert await gen_1(True) is True |
| assert await gen_1(False) is False |
| |
| # gen_2: all branches return |
| assert await gen_2(True) is True |
| assert await gen_2(False) is False |
| |
| # gen_3: mixed return/pass |
| assert await gen_3(True) is True |
| assert await gen_3(False) is False |
| |
| # gen_4: no returns in async with |
| assert await gen_4(True) is True |
| assert await gen_4(False) is False |
| |
| # gen_5: multiple branches, some return |
| assert await gen_5(0) == 0 |
| assert await gen_5(1) == 1 |
| assert await gen_5(2) == 2 |
| assert await gen_5(3) == 3 |
| |
| # gen_6: all explicit branches return, implicit fallthrough |
| assert await gen_6(0) == 0 |
| assert await gen_6(1) == 1 |
| assert await gen_6(2) == 2 |
| assert await gen_6(3) == 3 |
| |
| # gen_7: all branches return including else |
| assert await gen_7(0) == 0 |
| assert await gen_7(1) == 1 |
| assert await gen_7(2) == 2 |
| assert await gen_7(3) == 3 |
| |
| # Test async functions with nested functions (environment class) |
| # env_1: mixed return/no-return |
| assert await env_1(True) is True |
| assert await env_1(False) is False |
| |
| # env_2: all branches return |
| assert await env_2(True) is True |
| assert await env_2(False) is False |
| |
| # env_3: mixed return/pass |
| assert await env_3(True) is True |
| assert await env_3(False) is False |
| |
| # env_4: no returns in async with |
| assert await env_4(True) is True |
| assert await env_4(False) is False |
| |
| # env_5: multiple branches, some return |
| assert await env_5(0) == 0 |
| assert await env_5(1) == 1 |
| assert await env_5(2) == 2 |
| assert await env_5(3) == 3 |
| |
| # env_6: all explicit branches return, implicit fallthrough |
| assert await env_6(0) == 0 |
| assert await env_6(1) == 1 |
| assert await env_6(2) == 2 |
| assert await env_6(3) == 3 |
| |
| # env_7: all branches return including else |
| assert await env_7(0) == 0 |
| assert await env_7(1) == 1 |
| assert await env_7(2) == 2 |
| assert await env_7(3) == 3 |
| |
| [case testAsyncTryExceptFinallyAwait] |
| import asyncio |
| from testutil import assertRaises |
| |
| class TestError(Exception): |
| pass |
| |
| # Test 0: Simplest case - just try/finally with raise and await |
| async def simple_try_finally_await() -> None: |
| try: |
| raise ValueError("simple error") |
| finally: |
| await asyncio.sleep(0) |
| |
| # Test 1: Raise inside try, catch in except, don't re-raise |
| async def async_try_except_no_reraise() -> int: |
| try: |
| raise ValueError("test error") |
| return 1 # Never reached |
| except ValueError: |
| return 2 # Should return this |
| finally: |
| await asyncio.sleep(0) |
| return 3 # Should not reach this |
| |
| # Test 2: Raise inside try, catch in except, re-raise |
| async def async_try_except_reraise() -> int: |
| try: |
| raise ValueError("test error") |
| return 1 # Never reached |
| except ValueError: |
| raise # Re-raise the exception |
| finally: |
| await asyncio.sleep(0) |
| return 2 # Should not reach this |
| |
| # Test 3: Raise inside try, catch in except, raise different error |
| async def async_try_except_raise_different() -> int: |
| try: |
| raise ValueError("original error") |
| return 1 # Never reached |
| except ValueError: |
| raise RuntimeError("different error") |
| finally: |
| await asyncio.sleep(0) |
| return 2 # Should not reach this |
| |
| # Test 4: Another try/except block inside finally |
| async def async_try_except_inside_finally() -> int: |
| try: |
| raise ValueError("outer error") |
| return 1 # Never reached |
| finally: |
| await asyncio.sleep(0) |
| try: |
| raise RuntimeError("inner error") |
| except RuntimeError: |
| pass # Catch inner error |
| return 2 # What happens after finally with inner exception handled? |
| |
| # Test 5: Another try/finally block inside finally |
| async def async_try_finally_inside_finally() -> int: |
| try: |
| raise ValueError("outer error") |
| return 1 # Never reached |
| finally: |
| await asyncio.sleep(0) |
| try: |
| raise RuntimeError("inner error") |
| finally: |
| await asyncio.sleep(0) |
| return 2 # Should not reach this |
| |
| # Control case: No await in finally - should work correctly |
| async def async_exception_no_await_in_finally() -> None: |
| """Control case: This works correctly - exception propagates""" |
| try: |
| raise TestError("This exception will propagate!") |
| finally: |
| pass # No await here |
| |
| # Test function with no exception to check normal flow |
| async def async_no_exception_with_await_in_finally() -> int: |
| try: |
| return 1 # Normal return |
| finally: |
| await asyncio.sleep(0) |
| return 2 # Should not reach this |
| |
| async def test_async_try_except_finally_await() -> None: |
| # Test 0: Simplest case - just try/finally with exception |
| # Expected: ValueError propagates |
| with assertRaises(ValueError): |
| await simple_try_finally_await() |
| |
| # Test 1: Exception caught, not re-raised |
| # Expected: return 2 (from except block) |
| result = await async_try_except_no_reraise() |
| assert result == 2, f"Expected 2, got {result}" |
| |
| # Test 2: Exception caught and re-raised |
| # Expected: ValueError propagates |
| with assertRaises(ValueError): |
| await async_try_except_reraise() |
| |
| # Test 3: Exception caught, different exception raised |
| # Expected: RuntimeError propagates |
| with assertRaises(RuntimeError): |
| await async_try_except_raise_different() |
| |
| # Test 4: Try/except inside finally |
| # Expected: ValueError propagates (outer exception) |
| with assertRaises(ValueError): |
| await async_try_except_inside_finally() |
| |
| # Test 5: Try/finally inside finally |
| # Expected: RuntimeError propagates (inner error) |
| with assertRaises(RuntimeError): |
| await async_try_finally_inside_finally() |
| |
| # Control case: No await in finally (should work correctly) |
| with assertRaises(TestError): |
| await async_exception_no_await_in_finally() |
| |
| # Test normal flow (no exception) |
| # Expected: return 1 |
| result = await async_no_exception_with_await_in_finally() |
| assert result == 1, f"Expected 1, got {result}" |
| |
| [file asyncio/__init__.pyi] |
| async def sleep(t: float) -> None: ... |
| |
| [case testAsyncContextManagerExceptionHandling] |
| import asyncio |
| from typing import Optional, Type |
| from testutil import assertRaises |
| |
| # Test 1: Basic async context manager that doesn't suppress exceptions |
| class AsyncContextManager: |
| async def __aenter__(self) -> 'AsyncContextManager': |
| return self |
| |
| async def __aexit__(self, exc_type: Optional[Type[BaseException]], |
| exc_val: Optional[BaseException], |
| exc_tb: object) -> None: |
| # This await in __aexit__ is like await in finally |
| await asyncio.sleep(0) |
| # Don't suppress the exception (return None/False) |
| |
| async def func_with_async_context_manager() -> str: |
| async with AsyncContextManager(): |
| raise ValueError("Exception inside async with") |
| return "should not reach" # Never reached |
| return "should not reach either" # Never reached |
| |
| async def test_basic_exception() -> str: |
| try: |
| await func_with_async_context_manager() |
| return "func_a returned normally - bug!" |
| except ValueError: |
| return "caught ValueError - correct!" |
| except Exception as e: |
| return f"caught different exception: {type(e).__name__}" |
| |
| # Test 2: Async context manager that raises a different exception in __aexit__ |
| class AsyncContextManagerRaisesInExit: |
| async def __aenter__(self) -> 'AsyncContextManagerRaisesInExit': |
| return self |
| |
| async def __aexit__(self, exc_type: Optional[Type[BaseException]], |
| exc_val: Optional[BaseException], |
| exc_tb: object) -> None: |
| # This await in __aexit__ is like await in finally |
| await asyncio.sleep(0) |
| # Raise a different exception - this should replace the original exception |
| raise RuntimeError("Exception in __aexit__") |
| |
| async def func_with_raising_context_manager() -> str: |
| async with AsyncContextManagerRaisesInExit(): |
| raise ValueError("Original exception") |
| return "should not reach" # Never reached |
| return "should not reach either" # Never reached |
| |
| async def test_exception_in_aexit() -> str: |
| try: |
| await func_with_raising_context_manager() |
| return "func returned normally - unexpected!" |
| except RuntimeError: |
| return "caught RuntimeError - correct!" |
| except ValueError: |
| return "caught ValueError - original exception not replaced!" |
| except Exception as e: |
| return f"caught different exception: {type(e).__name__}" |
| |
| async def test_async_context_manager_exception_handling() -> None: |
| # Test 1: Basic exception propagation |
| result = await test_basic_exception() |
| # Expected: "caught ValueError - correct!" |
| assert result == "caught ValueError - correct!", f"Expected exception to propagate, got: {result}" |
| |
| # Test 2: Exception raised in __aexit__ replaces original exception |
| result = await test_exception_in_aexit() |
| # Expected: "caught RuntimeError - correct!" |
| # (The RuntimeError from __aexit__ should replace the ValueError) |
| assert result == "caught RuntimeError - correct!", f"Expected RuntimeError from __aexit__, got: {result}" |
| |
| [file asyncio/__init__.pyi] |
| async def sleep(t: float) -> None: ... |
| |
| [case testCallableArgWithSameNameAsHelperMethod] |
| import asyncio |
| from typing import Awaitable, Callable |
| |
| |
| MyCallable = Callable[[int, int], Awaitable[int]] |
| |
| async def add(a: int, b: int) -> int: |
| return a + b |
| |
| async def await_send(send: MyCallable) -> int: |
| return await send(1, 2) |
| |
| async def await_throw(throw: MyCallable) -> int: |
| return await throw(3, 4) |
| |
| async def tests() -> None: |
| assert await await_send(add) == 3 |
| assert await await_throw(add) == 7 |
| |
| def test_callable_arg_same_name_as_helper() -> None: |
| asyncio.run(tests()) |
| |
| [file asyncio/__init__.pyi] |
| def run(x: object) -> object: ... |
| |
| [case testRunAsyncCancelFinallySpecialCase] |
| import asyncio |
| |
| from testutil import assertRaises |
| |
| # Greatly simplified from asyncio.Condition |
| class Condition: |
| async def acquire(self) -> None: pass |
| |
| async def wait(self) -> bool: |
| l = asyncio.get_running_loop() |
| fut = l.create_future() |
| a = [] |
| try: |
| try: |
| a.append(fut) |
| try: |
| await fut |
| return True |
| finally: |
| a.pop() |
| finally: |
| err = None |
| while True: |
| try: |
| await self.acquire() |
| break |
| except asyncio.CancelledError as e: |
| err = e |
| |
| if err is not None: |
| try: |
| raise err |
| finally: |
| err = None |
| except BaseException: |
| raise |
| |
| async def do_cancel() -> None: |
| cond = Condition() |
| wait = asyncio.create_task(cond.wait()) |
| asyncio.get_running_loop().call_soon(wait.cancel) |
| with assertRaises(asyncio.CancelledError): |
| await wait |
| |
| def test_cancel_special_case() -> None: |
| asyncio.run(do_cancel()) |
| |
| [file asyncio/__init__.pyi] |
| from typing import Any |
| |
| class CancelledError(Exception): ... |
| |
| def run(x: object) -> object: ... |
| def get_running_loop() -> Any: ... |
| def create_task(x: object) -> Any: ... |