| # async test cases (compile and run) |
| |
| [case testRunAsyncBasics] |
| import asyncio |
| from typing import Callable, Awaitable |
| |
| from testutil import assertRaises |
| |
| async def h() -> int: |
| return 1 |
| |
| async def g() -> int: |
| await asyncio.sleep(0) |
| return await h() |
| |
| async def f() -> int: |
| return await g() + 2 |
| |
| async def f2() -> int: |
| x = 0 |
| for i in range(2): |
| x += i + await f() + await g() |
| return x |
| |
| def test_simple_call() -> None: |
| result = asyncio.run(f()) |
| assert result == 3 |
| |
| def test_multiple_awaits_in_expression() -> None: |
| result = asyncio.run(f2()) |
| assert result == 9 |
| |
| class MyError(Exception): |
| pass |
| |
| async def exc1() -> None: |
| await asyncio.sleep(0) |
| raise MyError() |
| |
| async def exc2() -> None: |
| await asyncio.sleep(0) |
| raise MyError() |
| |
| async def exc3() -> None: |
| await exc1() |
| |
| async def exc4() -> None: |
| await exc2() |
| |
| async def exc5() -> int: |
| try: |
| await exc1() |
| except MyError: |
| return 3 |
| return 4 |
| |
| async def exc6() -> int: |
| try: |
| await exc4() |
| except MyError: |
| return 3 |
| return 4 |
| |
| def test_exception() -> None: |
| with assertRaises(MyError): |
| asyncio.run(exc1()) |
| with assertRaises(MyError): |
| asyncio.run(exc2()) |
| with assertRaises(MyError): |
| asyncio.run(exc3()) |
| with assertRaises(MyError): |
| asyncio.run(exc4()) |
| assert asyncio.run(exc5()) == 3 |
| assert asyncio.run(exc6()) == 3 |
| |
| async def indirect_call(x: int, c: Callable[[int], Awaitable[int]]) -> int: |
| return await c(x) |
| |
| async def indirect_call_2(a: Awaitable[None]) -> None: |
| await a |
| |
| async def indirect_call_3(a: Awaitable[float]) -> float: |
| return (await a) + 1.0 |
| |
| async def inc(x: int) -> int: |
| await asyncio.sleep(0) |
| return x + 1 |
| |
| async def ident(x: float, err: bool = False) -> float: |
| await asyncio.sleep(0.0) |
| if err: |
| raise MyError() |
| return x + float("0.0") |
| |
| def test_indirect_call() -> None: |
| assert asyncio.run(indirect_call(3, inc)) == 4 |
| |
| with assertRaises(MyError): |
| asyncio.run(indirect_call_2(exc1())) |
| |
| assert asyncio.run(indirect_call_3(ident(2.0))) == 3.0 |
| assert asyncio.run(indirect_call_3(ident(-113.0))) == -112.0 |
| assert asyncio.run(indirect_call_3(ident(-114.0))) == -113.0 |
| |
| with assertRaises(MyError): |
| asyncio.run(indirect_call_3(ident(1.0, True))) |
| with assertRaises(MyError): |
| asyncio.run(indirect_call_3(ident(-113.0, True))) |
| |
| class C: |
| def __init__(self, n: int) -> None: |
| self.n = n |
| |
| async def add(self, x: int, err: bool = False) -> int: |
| await asyncio.sleep(0) |
| if err: |
| raise MyError() |
| return x + self.n |
| |
| async def method_call(x: int) -> int: |
| c = C(5) |
| return await c.add(x) |
| |
| async def method_call_exception() -> int: |
| c = C(5) |
| return await c.add(3, err=True) |
| |
| def test_async_method_call() -> None: |
| assert asyncio.run(method_call(3)) == 8 |
| with assertRaises(MyError): |
| asyncio.run(method_call_exception()) |
| |
| [file asyncio/__init__.pyi] |
| async def sleep(t: float) -> None: ... |
| # eh, we could use the real type but it doesn't seem important |
| def run(x: object) -> object: ... |
| |
| [typing fixtures/typing-full.pyi] |
| |
| [case testRunAsyncAwaitInVariousPositions] |
| from typing import cast, Any |
| |
| import asyncio |
| |
| async def one() -> int: |
| await asyncio.sleep(0.0) |
| return int() + 1 |
| |
| async def true() -> bool: |
| return bool(int() + await one()) |
| |
| async def branch_await() -> int: |
| if bool(int() + 1) == await true(): |
| return 3 |
| return 2 |
| |
| async def branch_await_not() -> int: |
| if bool(int() + 1) == (not await true()): |
| return 3 |
| return 2 |
| |
| def test_branch() -> None: |
| assert asyncio.run(branch_await()) == 3 |
| assert asyncio.run(branch_await_not()) == 2 |
| |
| async def assign_multi() -> int: |
| _, x = int(), await one() |
| return x + 1 |
| |
| def test_assign_multi() -> None: |
| assert asyncio.run(assign_multi()) == 2 |
| |
| class C: |
| def __init__(self, s: str) -> None: |
| self.s = s |
| |
| def concat(self, s: str) -> str: |
| return self.s + s |
| |
| async def make_c(s: str) -> C: |
| await one() |
| return C(s) |
| |
| async def concat(s: str, t: str) -> str: |
| await one() |
| return s + t |
| |
| async def set_attr(s: str) -> None: |
| (await make_c("xyz")).s = await concat(s, "!") |
| |
| def test_set_attr() -> None: |
| asyncio.run(set_attr("foo")) # Just check that it compiles and runs |
| |
| def concat2(x: str, y: str) -> str: |
| return x + y |
| |
| async def call1(s: str) -> str: |
| return concat2(str(int()), await concat(s, "a")) |
| |
| async def call2(s: str) -> str: |
| return await concat(str(int()), await concat(s, "b")) |
| |
| def test_call() -> None: |
| assert asyncio.run(call1("foo")) == "0fooa" |
| assert asyncio.run(call2("foo")) == "0foob" |
| |
| async def method_call(s: str) -> str: |
| return C("<").concat(await concat(s, ">")) |
| |
| def test_method_call() -> None: |
| assert asyncio.run(method_call("foo")) == "<foo>" |
| |
| class D: |
| def __init__(self, a: str, b: str) -> None: |
| self.a = a |
| self.b = b |
| |
| async def construct(s: str) -> str: |
| c = D(await concat(s, "!"), await concat(s, "?")) |
| return c.a + c.b |
| |
| def test_construct() -> None: |
| assert asyncio.run(construct("foo")) == "foo!foo?" |
| |
| [file asyncio/__init__.pyi] |
| async def sleep(t: float) -> None: ... |
| # eh, we could use the real type but it doesn't seem important |
| def run(x: object) -> object: ... |
| |
| [typing fixtures/typing-full.pyi] |
| |
| |
| [case testAsyncWith] |
| from testutil import async_val |
| |
| class async_ctx: |
| async def __aenter__(self) -> str: |
| await async_val("enter") |
| return "test" |
| |
| async def __aexit__(self, x, y, z) -> None: |
| await async_val("exit") |
| |
| |
| async def async_with() -> str: |
| async with async_ctx() as x: |
| return await async_val("body") |
| |
| |
| [file driver.py] |
| from native import async_with |
| from testutil import run_generator |
| |
| yields, val = run_generator(async_with(), [None, 'x', None]) |
| assert yields == ('enter', 'body', 'exit'), yields |
| assert val == 'x', val |
| |
| |
| [case testAsyncReturn] |
| from testutil import async_val |
| |
| async def async_return() -> str: |
| try: |
| return 'test' |
| finally: |
| await async_val('foo') |
| |
| [file driver.py] |
| from native import async_return |
| from testutil import run_generator |
| |
| yields, val = run_generator(async_return()) |
| assert yields == ('foo',) |
| assert val == 'test', val |
| |
| [case testAsyncFor] |
| from typing import AsyncIterable, List, Set, Dict |
| |
| async def async_iter(xs: AsyncIterable[int]) -> List[int]: |
| ys = [] |
| async for x in xs: |
| ys.append(x) |
| return ys |
| |
| async def async_comp(xs: AsyncIterable[int]) -> List[int]: |
| ys = [x async for x in xs] |
| return ys |
| |
| async def async_comp_set(xs: AsyncIterable[int]) -> Set[int]: |
| return {x async for x in xs} |
| |
| async def async_comp_dict(xs: AsyncIterable[int]) -> Dict[int, str]: |
| return {x: str(x) async for x in xs} |
| |
| [typing fixtures/typing-full.pyi] |
| |
| [file driver.py] |
| from native import async_iter, async_comp, async_comp_set, async_comp_dict |
| from testutil import run_generator, async_val |
| from typing import AsyncIterable, List |
| |
| # defined here since we couldn't do it inside the test yet... |
| async def foo() -> AsyncIterable[int]: |
| for x in range(3): |
| await async_val(x) |
| yield x |
| |
| yields, val = run_generator(async_iter(foo())) |
| assert val == [0,1,2], val |
| assert yields == (0,1,2), yields |
| |
| yields, val = run_generator(async_comp(foo())) |
| assert val == [0,1,2], val |
| assert yields == (0,1,2), yields |
| |
| yields, val = run_generator(async_comp_set(foo())) |
| assert val == {0,1,2}, val |
| assert yields == (0,1,2), yields |
| |
| yields, val = run_generator(async_comp_dict(foo())) |
| assert val == {0: '0',1: '1', 2: '2'}, val |
| assert yields == (0,1,2), yields |
| |
| [case testAsyncFor2] |
| from typing import AsyncIterable, List |
| |
| async def async_iter(xs: AsyncIterable[int]) -> List[int]: |
| ys = [] |
| async for x in xs: |
| ys.append(x) |
| return ys |
| |
| [typing fixtures/typing-full.pyi] |
| |
| [file driver.py] |
| from native import async_iter |
| from testutil import run_generator, async_val |
| from typing import AsyncIterable, List |
| |
| # defined here since we couldn't do it inside the test yet... |
| async def foo() -> AsyncIterable[int]: |
| for x in range(3): |
| await async_val(x) |
| yield x |
| raise Exception('lol no') |
| |
| yields, val = run_generator(async_iter(foo())) |
| assert yields == (0,1,2), yields |
| assert val == 'lol no', val |
| |
| [case testAsyncWithVarReuse] |
| class ConMan: |
| async def __aenter__(self) -> int: |
| return 1 |
| async def __aexit__(self, *exc: object): |
| pass |
| |
| class ConManB: |
| async def __aenter__(self) -> int: |
| return 2 |
| async def __aexit__(self, *exc: object): |
| pass |
| |
| async def x() -> None: |
| value = 2 |
| async with ConMan() as f: |
| value += f |
| assert value == 3, value |
| async with ConManB() as f: |
| value += f |
| assert value == 5, value |
| |
| [typing fixtures/typing-full.pyi] |
| [file driver.py] |
| import asyncio |
| import native |
| asyncio.run(native.x()) |
| |
| [case testRunAsyncSpecialCases] |
| import asyncio |
| |
| async def t() -> tuple[int, str, str]: |
| return (1, "x", "y") |
| |
| async def f() -> tuple[int, str, str]: |
| return await t() |
| |
| def test_tuple_return() -> None: |
| result = asyncio.run(f()) |
| assert result == (1, "x", "y") |
| |
| async def e() -> ValueError: |
| return ValueError("foo") |
| |
| async def g() -> ValueError: |
| return await e() |
| |
| def test_exception_return() -> None: |
| result = asyncio.run(g()) |
| assert isinstance(result, ValueError) |
| |
| [file asyncio/__init__.pyi] |
| async def sleep(t: float) -> None: ... |
| # eh, we could use the real type but it doesn't seem important |
| def run(x: object) -> object: ... |
| |
| [typing fixtures/typing-full.pyi] |
| |
| [case testRunAsyncRefCounting] |
| import asyncio |
| import gc |
| |
| def assert_no_leaks(fn, max_new): |
| # Warm-up, in case asyncio allocates something on first use |
| asyncio.run(fn()) |
| |
| gc.collect() |
| old_objs = gc.get_objects() |
| |
| for i in range(10): |
| asyncio.run(fn()) |
| |
| gc.collect() |
| new_objs = gc.get_objects() |
| |
| delta = len(new_objs) - len(old_objs) |
| # Often a few persistent objects get allocated, which may be unavoidable. |
| # The main thing we care about is that each iteration does not leak an |
| # additional object. |
| assert delta <= max_new, delta |
| |
| async def concat_one(x: str) -> str: |
| return x + "1" |
| |
| async def foo(n: int) -> str: |
| s = "" |
| while len(s) < n: |
| s = await concat_one(s) |
| return s |
| |
| def test_trivial() -> None: |
| assert_no_leaks(lambda: foo(1000), 5) |
| |
| async def make_list(a: list[int]) -> list[int]: |
| await concat_one("foobar") |
| return [a[0]] |
| |
| async def spill() -> list[int]: |
| a: list[int] = [] |
| for i in range(5): |
| await asyncio.sleep(0.0001) |
| a = (await make_list(a + [1])) + a + (await make_list(a + [2])) |
| return a |
| |
| async def bar(n: int) -> None: |
| for i in range(n): |
| await spill() |
| |
| def test_spilled() -> None: |
| assert_no_leaks(lambda: bar(40), 2) |
| |
| async def raise_deep(n: int) -> str: |
| if n == 0: |
| await asyncio.sleep(0.0001) |
| raise TypeError(str(n)) |
| else: |
| if n == 2: |
| await asyncio.sleep(0.0001) |
| return await raise_deep(n - 1) |
| |
| async def maybe_raise(n: int) -> str: |
| if n % 3 == 0: |
| await raise_deep(5) |
| elif n % 29 == 0: |
| await asyncio.sleep(0.0001) |
| return str(n) |
| |
| async def exc(n: int) -> list[str]: |
| a = [] |
| for i in range(n): |
| try: |
| a.append(str(int()) + await maybe_raise(n)) |
| except TypeError: |
| a.append(str(int() + 5)) |
| return a |
| |
| def test_exception() -> None: |
| assert_no_leaks(lambda: exc(50), 2) |
| |
| class C: |
| def __init__(self, s: str) -> None: |
| self.s = s |
| |
| async def id(c: C) -> C: |
| return c |
| |
| async def stolen_helper(c: C, s: str) -> str: |
| await asyncio.sleep(0.0001) |
| (await id(c)).s = await concat_one(s) |
| await asyncio.sleep(0.0001) |
| return c.s |
| |
| async def stolen(n: int) -> int: |
| for i in range(n): |
| c = C(str(i)) |
| s = await stolen_helper(c, str(i + 2)) |
| assert s == str(i + 2) + "1" |
| return n |
| |
| def test_stolen() -> None: |
| assert_no_leaks(lambda: stolen(100), 2) |
| |
| [file asyncio/__init__.pyi] |
| def run(x: object) -> object: ... |
| async def sleep(t: float) -> None: ... |
| |
| [case testRunAsyncMiscTypesInEnvironment] |
| # Here we test that values of various kinds of types can be spilled to the |
| # environment. In particular, types with "overlapping error values" such as |
| # i64 can be tricky, since they require extra work to support undefined |
| # attribute values (which raise AttributeError when accessed). For these, |
| # the object struct has a bitfield which keeps track of whether certain |
| # attributes have an assigned value. |
| # |
| # In practice we mark these attributes as "always defined", which causes these |
| # checks to be skipped on attribute access, and thus we don't require the |
| # bitfield to exist. |
| # |
| # See the comment of RType.error_overlap for more information. |
| |
| import asyncio |
| |
| from mypy_extensions import i64, i32, i16, u8 |
| |
| async def inc_float(x: float) -> float: |
| return x + 1.0 |
| |
| async def inc_i64(x: i64) -> i64: |
| return x + 1 |
| |
| async def inc_i32(x: i32) -> i32: |
| return x + 1 |
| |
| async def inc_i16(x: i16) -> i16: |
| return x + 1 |
| |
| async def inc_u8(x: u8) -> u8: |
| return x + 1 |
| |
| async def inc_tuple(x: tuple[i64, float]) -> tuple[i64, float]: |
| return x[0] + 1, x[1] + 1.5 |
| |
| async def neg_bool(b: bool) -> bool: |
| return not b |
| |
| async def float_ops(x: float) -> float: |
| n = x |
| n = await inc_float(n) |
| n = float("0.5") + await inc_float(n) |
| return n |
| |
| def test_float() -> None: |
| assert asyncio.run(float_ops(2.5)) == 5.0 |
| |
| async def i64_ops(x: i64) -> i64: |
| n = x |
| n = await inc_i64(n) |
| n = i64("1") + await inc_i64(n) |
| return n |
| |
| def test_i64() -> None: |
| assert asyncio.run(i64_ops(2)) == 5 |
| |
| async def i32_ops(x: i32) -> i32: |
| n = x |
| n = await inc_i32(n) |
| n = i32("1") + await inc_i32(n) |
| return n |
| |
| def test_i32() -> None: |
| assert asyncio.run(i32_ops(3)) == 6 |
| |
| async def i16_ops(x: i16) -> i16: |
| n = x |
| n = await inc_i16(n) |
| n = i16("1") + await inc_i16(n) |
| return n |
| |
| def test_i16() -> None: |
| assert asyncio.run(i16_ops(4)) == 7 |
| |
| async def u8_ops(x: u8) -> u8: |
| n = x |
| n = await inc_u8(n) |
| n = u8("1") + await inc_u8(n) |
| return n |
| |
| def test_u8() -> None: |
| assert asyncio.run(u8_ops(5)) == 8 |
| |
| async def tuple_ops(x: tuple[i64, float]) -> tuple[i64, float]: |
| n = x |
| n = await inc_tuple(n) |
| m = ((i64("1"), float("0.5")), await inc_tuple(n)) |
| return m[1] |
| |
| def test_tuple() -> None: |
| assert asyncio.run(tuple_ops((1, 2.5))) == (3, 5.5) |
| |
| async def bool_ops(x: bool) -> bool: |
| n = x |
| n = await neg_bool(n) |
| m = (bool("1"), await neg_bool(n)) |
| return m[0] and m[1] |
| |
| def test_bool() -> None: |
| assert asyncio.run(bool_ops(True)) is True |
| assert asyncio.run(bool_ops(False)) is False |
| |
| [file asyncio/__init__.pyi] |
| def run(x: object) -> object: ... |
| |
| [case testRunAsyncNestedFunctions] |
| from __future__ import annotations |
| |
| import asyncio |
| from typing import cast, Iterator, overload, Awaitable, Any, TypeVar |
| |
| from testutil import assertRaises |
| |
| def normal_contains_async_def(x: int) -> int: |
| async def f(y: int) -> int: |
| return x + y |
| |
| return 5 + cast(int, asyncio.run(f(6))) |
| |
| def test_def_contains_async_def() -> None: |
| assert normal_contains_async_def(3) == 14 |
| |
| async def inc(x: int) -> int: |
| return x + 1 |
| |
| async def async_def_contains_normal(x: int) -> int: |
| def nested(y: int, z: int) -> int: |
| return x + y + z |
| |
| a = x |
| a += nested((await inc(3)), (await inc(4))) |
| return a |
| |
| def test_async_def_contains_normal() -> None: |
| assert normal_contains_async_def(2) == (2 + 2 + 4 + 5) |
| |
| async def async_def_contains_async_def(x: int) -> int: |
| async def f(y: int) -> int: |
| return (await inc(x)) + (await inc(y)) |
| |
| return (await f(1)) + (await f(2)) |
| |
| def test_async_def_contains_async_def() -> None: |
| assert asyncio.run(async_def_contains_async_def(3)) == (3 + 1 + 1 + 1) + (3 + 1 + 2 + 1) |
| |
| async def async_def_contains_generator(x: int) -> tuple[int, int, int]: |
| def gen(y: int) -> Iterator[int]: |
| yield x + 1 |
| yield x + y |
| |
| it = gen(4) |
| res = x + 10, next(it), next(it) |
| |
| with assertRaises(StopIteration): |
| next(it) |
| |
| return res |
| |
| def test_async_def_contains_generator() -> None: |
| assert asyncio.run(async_def_contains_generator(3)) == (13, 4, 7) |
| |
| def generator_contains_async_def(x: int) -> Iterator[int]: |
| async def f(y: int) -> int: |
| return (await inc(x)) + (await inc(y)) |
| |
| yield cast(int, asyncio.run(f(2))) |
| yield cast(int, asyncio.run(f(3))) |
| yield x + 10 |
| |
| def test_generator_contains_async_def() -> None: |
| assert list(generator_contains_async_def(5)) == [6 + 3, 6 + 4, 15] |
| |
| async def async_def_contains_two_nested_functions(x: int, y: int) -> tuple[int, int]: |
| def f(a: int) -> int: |
| return x + a |
| |
| def g(b: int, c: int) -> int: |
| return y + b + c |
| |
| return (await inc(f(3))), (await inc(g(4, 10))) |
| |
| def test_async_def_contains_two_nested_functions() -> None: |
| assert asyncio.run(async_def_contains_two_nested_functions(5, 7)) == ( |
| (5 + 3 + 1), (7 + 4 + 10 + 1) |
| ) |
| |
| async def async_def_contains_overloaded_async_def(n: int) -> int: |
| @overload |
| async def f(x: int) -> int: ... |
| |
| @overload |
| async def f(x: str) -> str: ... |
| |
| async def f(x: int | str) -> Any: |
| return x |
| |
| return (await f(n)) + 1 |
| |
| |
| def test_async_def_contains_overloaded_async_def() -> None: |
| assert asyncio.run(async_def_contains_overloaded_async_def(5)) == 6 |
| |
| T = TypeVar("T") |
| |
| def deco(f: T) -> T: |
| return f |
| |
| async def async_def_contains_decorated_async_def(n: int) -> int: |
| @deco |
| async def f(x: int) -> int: |
| return x + 2 |
| |
| return (await f(n)) + 1 |
| |
| |
| def test_async_def_contains_decorated_async_def() -> None: |
| assert asyncio.run(async_def_contains_decorated_async_def(7)) == 10 |
| [file asyncio/__init__.pyi] |
| def run(x: object) -> object: ... |
| |
| [case testAsyncTryFinallyMixedReturn] |
| # This used to raise an AttributeError, when: |
| # - the try block contains multiple paths |
| # - at least one of those explicitly returns |
| # - at least one of those does not explicitly return |
| # - the non-returning path is taken at runtime |
| |
| import asyncio |
| |
| |
| async def test_mixed_return(b: bool) -> bool: |
| try: |
| if b: |
| return b |
| finally: |
| pass |
| return b |
| |
| |
| async def test_run() -> None: |
| # Test return path |
| result1 = await test_mixed_return(True) |
| assert result1 == True |
| |
| # Test non-return path |
| result2 = await test_mixed_return(False) |
| assert result2 == False |
| |
| |
| def test_async_try_finally_mixed_return() -> None: |
| asyncio.run(test_run()) |
| |
| [file driver.py] |
| from native import test_async_try_finally_mixed_return |
| test_async_try_finally_mixed_return() |
| |
| [file asyncio/__init__.pyi] |
| def run(x: object) -> object: ... |
| |
| [case testAsyncWithMixedReturn] |
| # This used to raise an AttributeError, related to |
| # testAsyncTryFinallyMixedReturn, this is essentially |
| # a far more extensive version of that test surfacing |
| # more edge cases |
| |
| import asyncio |
| from typing import Optional, Type, Literal |
| |
| |
| class AsyncContextManager: |
| async def __aenter__(self) -> "AsyncContextManager": |
| return self |
| |
| async def __aexit__( |
| self, |
| t: Optional[Type[BaseException]], |
| v: Optional[BaseException], |
| tb: object, |
| ) -> Literal[False]: |
| return False |
| |
| |
| # Simple async functions (generator class) |
| async def test_gen_1(b: bool) -> bool: |
| async with AsyncContextManager(): |
| if b: |
| return b |
| return b |
| |
| |
| async def test_gen_2(b: bool) -> bool: |
| async with AsyncContextManager(): |
| if b: |
| return b |
| else: |
| return b |
| |
| |
| async def test_gen_3(b: bool) -> bool: |
| async with AsyncContextManager(): |
| if b: |
| return b |
| else: |
| pass |
| return b |
| |
| |
| async def test_gen_4(b: bool) -> bool: |
| ret: bool |
| async with AsyncContextManager(): |
| if b: |
| ret = b |
| else: |
| ret = b |
| return ret |
| |
| |
| async def test_gen_5(i: int) -> int: |
| async with AsyncContextManager(): |
| if i == 1: |
| return i |
| elif i == 2: |
| pass |
| elif i == 3: |
| return i |
| return i |
| |
| |
| async def test_gen_6(i: int) -> int: |
| async with AsyncContextManager(): |
| if i == 1: |
| return i |
| elif i == 2: |
| return i |
| elif i == 3: |
| return i |
| return i |
| |
| |
| async def test_gen_7(i: int) -> int: |
| async with AsyncContextManager(): |
| if i == 1: |
| return i |
| elif i == 2: |
| return i |
| elif i == 3: |
| return i |
| else: |
| return i |
| |
| |
| # Async functions with nested functions (environment class) |
| async def test_env_1(b: bool) -> bool: |
| def helper() -> bool: |
| return True |
| |
| async with AsyncContextManager(): |
| if b: |
| return helper() |
| return b |
| |
| |
| async def test_env_2(b: bool) -> bool: |
| def helper() -> bool: |
| return True |
| |
| async with AsyncContextManager(): |
| if b: |
| return helper() |
| else: |
| return b |
| |
| |
| async def test_env_3(b: bool) -> bool: |
| def helper() -> bool: |
| return True |
| |
| async with AsyncContextManager(): |
| if b: |
| return helper() |
| else: |
| pass |
| return b |
| |
| |
| async def test_env_4(b: bool) -> bool: |
| def helper() -> bool: |
| return True |
| |
| ret: bool |
| async with AsyncContextManager(): |
| if b: |
| ret = helper() |
| else: |
| ret = b |
| return ret |
| |
| |
| async def test_env_5(i: int) -> int: |
| def helper() -> int: |
| return 1 |
| |
| async with AsyncContextManager(): |
| if i == 1: |
| return helper() |
| elif i == 2: |
| pass |
| elif i == 3: |
| return i |
| return i |
| |
| |
| async def test_env_6(i: int) -> int: |
| def helper() -> int: |
| return 1 |
| |
| async with AsyncContextManager(): |
| if i == 1: |
| return helper() |
| elif i == 2: |
| return i |
| elif i == 3: |
| return i |
| return i |
| |
| |
| async def test_env_7(i: int) -> int: |
| def helper() -> int: |
| return 1 |
| |
| async with AsyncContextManager(): |
| if i == 1: |
| return helper() |
| elif i == 2: |
| return i |
| elif i == 3: |
| return i |
| else: |
| return i |
| |
| |
| async def run_all_tests() -> None: |
| # Test simple async functions (generator class) |
| # test_env_1: mixed return/no-return |
| assert await test_gen_1(True) is True |
| assert await test_gen_1(False) is False |
| |
| # test_gen_2: all branches return |
| assert await test_gen_2(True) is True |
| assert await test_gen_2(False) is False |
| |
| # test_gen_3: mixed return/pass |
| assert await test_gen_3(True) is True |
| assert await test_gen_3(False) is False |
| |
| # test_gen_4: no returns in async with |
| assert await test_gen_4(True) is True |
| assert await test_gen_4(False) is False |
| |
| # test_gen_5: multiple branches, some return |
| assert await test_gen_5(0) == 0 |
| assert await test_gen_5(1) == 1 |
| assert await test_gen_5(2) == 2 |
| assert await test_gen_5(3) == 3 |
| |
| # test_gen_6: all explicit branches return, implicit fallthrough |
| assert await test_gen_6(0) == 0 |
| assert await test_gen_6(1) == 1 |
| assert await test_gen_6(2) == 2 |
| assert await test_gen_6(3) == 3 |
| |
| # test_gen_7: all branches return including else |
| assert await test_gen_7(0) == 0 |
| assert await test_gen_7(1) == 1 |
| assert await test_gen_7(2) == 2 |
| assert await test_gen_7(3) == 3 |
| |
| # Test async functions with nested functions (environment class) |
| # test_env_1: mixed return/no-return |
| assert await test_env_1(True) is True |
| assert await test_env_1(False) is False |
| |
| # test_env_2: all branches return |
| assert await test_env_2(True) is True |
| assert await test_env_2(False) is False |
| |
| # test_env_3: mixed return/pass |
| assert await test_env_3(True) is True |
| assert await test_env_3(False) is False |
| |
| # test_env_4: no returns in async with |
| assert await test_env_4(True) is True |
| assert await test_env_4(False) is False |
| |
| # test_env_5: multiple branches, some return |
| assert await test_env_5(0) == 0 |
| assert await test_env_5(1) == 1 |
| assert await test_env_5(2) == 2 |
| assert await test_env_5(3) == 3 |
| |
| # test_env_6: all explicit branches return, implicit fallthrough |
| assert await test_env_6(0) == 0 |
| assert await test_env_6(1) == 1 |
| assert await test_env_6(2) == 2 |
| assert await test_env_6(3) == 3 |
| |
| # test_env_7: all branches return including else |
| assert await test_env_7(0) == 0 |
| assert await test_env_7(1) == 1 |
| assert await test_env_7(2) == 2 |
| assert await test_env_7(3) == 3 |
| |
| |
| def test_async_with_mixed_return() -> None: |
| asyncio.run(run_all_tests()) |
| |
| [file driver.py] |
| from native import test_async_with_mixed_return |
| test_async_with_mixed_return() |
| |
| [file asyncio/__init__.pyi] |
| def run(x: object) -> object: ... |
| |
| [case testAsyncTryExceptFinallyAwait] |
| import asyncio |
| from testutil import assertRaises |
| |
| class TestError(Exception): |
| pass |
| |
| # Test 0: Simplest case - just try/finally with raise and await |
| async def simple_try_finally_await() -> None: |
| try: |
| raise ValueError("simple error") |
| finally: |
| await asyncio.sleep(0) |
| |
| # Test 1: Raise inside try, catch in except, don't re-raise |
| async def async_try_except_no_reraise() -> int: |
| try: |
| raise ValueError("test error") |
| return 1 # Never reached |
| except ValueError: |
| return 2 # Should return this |
| finally: |
| await asyncio.sleep(0) |
| return 3 # Should not reach this |
| |
| # Test 2: Raise inside try, catch in except, re-raise |
| async def async_try_except_reraise() -> int: |
| try: |
| raise ValueError("test error") |
| return 1 # Never reached |
| except ValueError: |
| raise # Re-raise the exception |
| finally: |
| await asyncio.sleep(0) |
| return 2 # Should not reach this |
| |
| # Test 3: Raise inside try, catch in except, raise different error |
| async def async_try_except_raise_different() -> int: |
| try: |
| raise ValueError("original error") |
| return 1 # Never reached |
| except ValueError: |
| raise RuntimeError("different error") |
| finally: |
| await asyncio.sleep(0) |
| return 2 # Should not reach this |
| |
| # Test 4: Another try/except block inside finally |
| async def async_try_except_inside_finally() -> int: |
| try: |
| raise ValueError("outer error") |
| return 1 # Never reached |
| finally: |
| await asyncio.sleep(0) |
| try: |
| raise RuntimeError("inner error") |
| except RuntimeError: |
| pass # Catch inner error |
| return 2 # What happens after finally with inner exception handled? |
| |
| # Test 5: Another try/finally block inside finally |
| async def async_try_finally_inside_finally() -> int: |
| try: |
| raise ValueError("outer error") |
| return 1 # Never reached |
| finally: |
| await asyncio.sleep(0) |
| try: |
| raise RuntimeError("inner error") |
| finally: |
| await asyncio.sleep(0) |
| return 2 # Should not reach this |
| |
| # Control case: No await in finally - should work correctly |
| async def async_exception_no_await_in_finally() -> None: |
| """Control case: This works correctly - exception propagates""" |
| try: |
| raise TestError("This exception will propagate!") |
| finally: |
| pass # No await here |
| |
| # Test function with no exception to check normal flow |
| async def async_no_exception_with_await_in_finally() -> int: |
| try: |
| return 1 # Normal return |
| finally: |
| await asyncio.sleep(0) |
| return 2 # Should not reach this |
| |
| def test_async_try_except_finally_await() -> None: |
| # Test 0: Simplest case - just try/finally with exception |
| # Expected: ValueError propagates |
| with assertRaises(ValueError): |
| asyncio.run(simple_try_finally_await()) |
| |
| # Test 1: Exception caught, not re-raised |
| # Expected: return 2 (from except block) |
| result = asyncio.run(async_try_except_no_reraise()) |
| assert result == 2, f"Expected 2, got {result}" |
| |
| # Test 2: Exception caught and re-raised |
| # Expected: ValueError propagates |
| with assertRaises(ValueError): |
| asyncio.run(async_try_except_reraise()) |
| |
| # Test 3: Exception caught, different exception raised |
| # Expected: RuntimeError propagates |
| with assertRaises(RuntimeError): |
| asyncio.run(async_try_except_raise_different()) |
| |
| # Test 4: Try/except inside finally |
| # Expected: ValueError propagates (outer exception) |
| with assertRaises(ValueError): |
| asyncio.run(async_try_except_inside_finally()) |
| |
| # Test 5: Try/finally inside finally |
| # Expected: RuntimeError propagates (inner error) |
| with assertRaises(RuntimeError): |
| asyncio.run(async_try_finally_inside_finally()) |
| |
| # Control case: No await in finally (should work correctly) |
| with assertRaises(TestError): |
| asyncio.run(async_exception_no_await_in_finally()) |
| |
| # Test normal flow (no exception) |
| # Expected: return 1 |
| result = asyncio.run(async_no_exception_with_await_in_finally()) |
| assert result == 1, f"Expected 1, got {result}" |
| |
| [file asyncio/__init__.pyi] |
| async def sleep(t: float) -> None: ... |
| def run(x: object) -> object: ... |
| |
| [case testAsyncContextManagerExceptionHandling] |
| import asyncio |
| from typing import Optional, Type |
| from testutil import assertRaises |
| |
| # Test 1: Basic async context manager that doesn't suppress exceptions |
| class AsyncContextManager: |
| async def __aenter__(self) -> 'AsyncContextManager': |
| return self |
| |
| async def __aexit__(self, exc_type: Optional[Type[BaseException]], |
| exc_val: Optional[BaseException], |
| exc_tb: object) -> None: |
| # This await in __aexit__ is like await in finally |
| await asyncio.sleep(0) |
| # Don't suppress the exception (return None/False) |
| |
| async def func_with_async_context_manager() -> str: |
| async with AsyncContextManager(): |
| raise ValueError("Exception inside async with") |
| return "should not reach" # Never reached |
| return "should not reach either" # Never reached |
| |
| async def test_basic_exception() -> str: |
| try: |
| await func_with_async_context_manager() |
| return "func_a returned normally - bug!" |
| except ValueError: |
| return "caught ValueError - correct!" |
| except Exception as e: |
| return f"caught different exception: {type(e).__name__}" |
| |
| # Test 2: Async context manager that raises a different exception in __aexit__ |
| class AsyncContextManagerRaisesInExit: |
| async def __aenter__(self) -> 'AsyncContextManagerRaisesInExit': |
| return self |
| |
| async def __aexit__(self, exc_type: Optional[Type[BaseException]], |
| exc_val: Optional[BaseException], |
| exc_tb: object) -> None: |
| # This await in __aexit__ is like await in finally |
| await asyncio.sleep(0) |
| # Raise a different exception - this should replace the original exception |
| raise RuntimeError("Exception in __aexit__") |
| |
| async def func_with_raising_context_manager() -> str: |
| async with AsyncContextManagerRaisesInExit(): |
| raise ValueError("Original exception") |
| return "should not reach" # Never reached |
| return "should not reach either" # Never reached |
| |
| async def test_exception_in_aexit() -> str: |
| try: |
| await func_with_raising_context_manager() |
| return "func returned normally - unexpected!" |
| except RuntimeError: |
| return "caught RuntimeError - correct!" |
| except ValueError: |
| return "caught ValueError - original exception not replaced!" |
| except Exception as e: |
| return f"caught different exception: {type(e).__name__}" |
| |
| def test_async_context_manager_exception_handling() -> None: |
| # Test 1: Basic exception propagation |
| result = asyncio.run(test_basic_exception()) |
| # Expected: "caught ValueError - correct!" |
| assert result == "caught ValueError - correct!", f"Expected exception to propagate, got: {result}" |
| |
| # Test 2: Exception raised in __aexit__ replaces original exception |
| result = asyncio.run(test_exception_in_aexit()) |
| # Expected: "caught RuntimeError - correct!" |
| # (The RuntimeError from __aexit__ should replace the ValueError) |
| assert result == "caught RuntimeError - correct!", f"Expected RuntimeError from __aexit__, got: {result}" |
| |
| [file asyncio/__init__.pyi] |
| async def sleep(t: float) -> None: ... |
| def run(x: object) -> object: ... |