| import asyncio |
| import gc |
| import re |
| from concurrent.futures import ThreadPoolExecutor |
| from datetime import datetime, timezone |
| from threading import Event |
| from unittest.mock import patch |
| |
| import pytest |
| from click.testing import CliRunner |
| |
| from tests.util import DETERMINISTIC_HEADER, read_data |
| |
| try: |
| from aiohttp import web |
| from aiohttp.test_utils import AioHTTPTestCase |
| |
| import blackd |
| import blackd.client |
| except ImportError as e: |
| raise RuntimeError("Please install Black with the 'd' extra") from e |
| |
| import black |
| |
| |
| @pytest.mark.blackd |
| class BlackDTestCase(AioHTTPTestCase): |
| def tearDown(self) -> None: |
| # Work around https://github.com/python/cpython/issues/124706 |
| gc.collect() |
| super().tearDown() |
| |
| def test_blackd_main(self) -> None: |
| with patch("blackd.web.run_app"): |
| result = CliRunner().invoke(blackd.main, []) |
| if result.exception is not None: |
| raise result.exception |
| self.assertEqual(result.exit_code, 0) |
| |
| async def get_application(self) -> web.Application: |
| return blackd.make_app() |
| |
| async def test_blackd_request_needs_formatting(self) -> None: |
| response = await self.client.post("/", data=b"print('hello world')") |
| self.assertEqual(response.status, 200) |
| self.assertEqual(response.charset, "utf8") |
| self.assertEqual(await response.read(), b'print("hello world")\n') |
| |
| async def test_blackd_request_no_change(self) -> None: |
| response = await self.client.post("/", data=b'print("hello world")\n') |
| self.assertEqual(response.status, 204) |
| self.assertEqual(await response.read(), b"") |
| |
| async def test_blackd_request_syntax_error(self) -> None: |
| response = await self.client.post("/", data=b"what even ( is") |
| self.assertEqual(response.status, 400) |
| content = await response.text() |
| self.assertTrue( |
| content.startswith("Cannot parse"), |
| msg=f"Expected error to start with 'Cannot parse', got {repr(content)}", |
| ) |
| |
| async def test_blackd_unsupported_version(self) -> None: |
| response = await self.client.post( |
| "/", data=b"what", headers={blackd.PROTOCOL_VERSION_HEADER: "2"} |
| ) |
| self.assertEqual(response.status, 501) |
| |
| async def test_blackd_supported_version(self) -> None: |
| response = await self.client.post( |
| "/", data=b"what", headers={blackd.PROTOCOL_VERSION_HEADER: "1"} |
| ) |
| self.assertEqual(response.status, 200) |
| |
| async def test_blackd_invalid_python_variant(self) -> None: |
| async def check(header_value: str, expected_status: int = 400) -> None: |
| response = await self.client.post( |
| "/", |
| data=b"what", |
| headers={blackd.PYTHON_VARIANT_HEADER: header_value}, |
| ) |
| self.assertEqual(response.status, expected_status) |
| |
| await check("lol") |
| await check("ruby3.5") |
| await check("pyi3.6") |
| await check("py1.5") |
| await check("2") |
| await check("2.7") |
| await check("py2.7") |
| await check("2.8") |
| await check("py2.8") |
| await check("3.0") |
| await check("pypy3.0") |
| await check("jython3.4") |
| |
| async def test_blackd_pyi(self) -> None: |
| source, expected = read_data("cases", "stub.py") |
| response = await self.client.post( |
| "/", data=source, headers={blackd.PYTHON_VARIANT_HEADER: "pyi"} |
| ) |
| self.assertEqual(response.status, 200) |
| self.assertEqual(await response.text(), expected) |
| |
| async def test_blackd_diff(self) -> None: |
| diff_header = re.compile( |
| r"(In|Out)\t\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d\.\d\d\d\d\d\d\+\d\d:\d\d" |
| ) |
| |
| source, _ = read_data("miscellaneous", "blackd_diff") |
| expected, _ = read_data("miscellaneous", "blackd_diff.diff") |
| |
| response = await self.client.post( |
| "/", data=source, headers={blackd.DIFF_HEADER: "true"} |
| ) |
| self.assertEqual(response.status, 200) |
| |
| actual = await response.text() |
| actual = diff_header.sub(DETERMINISTIC_HEADER, actual) |
| self.assertEqual(actual, expected) |
| |
| async def test_blackd_python_variant(self) -> None: |
| code = ( |
| "def f(\n" |
| " and_has_a_bunch_of,\n" |
| " very_long_arguments_too,\n" |
| " and_lots_of_them_as_well_lol,\n" |
| " **and_very_long_keyword_arguments\n" |
| "):\n" |
| " pass\n" |
| ) |
| |
| async def check(header_value: str, expected_status: int) -> None: |
| response = await self.client.post( |
| "/", data=code, headers={blackd.PYTHON_VARIANT_HEADER: header_value} |
| ) |
| self.assertEqual( |
| response.status, expected_status, msg=await response.text() |
| ) |
| |
| await check("3.6", 200) |
| await check("py3.6", 200) |
| await check("3.6,3.7", 200) |
| await check("3.6,py3.7", 200) |
| await check("py36,py37", 200) |
| await check("36", 200) |
| await check("3.6.4", 200) |
| await check("3.4", 204) |
| await check("py3.4", 204) |
| await check("py34,py36", 204) |
| await check("34", 204) |
| |
| async def test_blackd_line_length(self) -> None: |
| response = await self.client.post( |
| "/", data=b'print("hello")\n', headers={blackd.LINE_LENGTH_HEADER: "7"} |
| ) |
| self.assertEqual(response.status, 200) |
| |
| async def test_blackd_invalid_line_length(self) -> None: |
| response = await self.client.post( |
| "/", |
| data=b'print("hello")\n', |
| headers={blackd.LINE_LENGTH_HEADER: "NaN"}, |
| ) |
| self.assertEqual(response.status, 400) |
| |
| async def test_blackd_skip_first_source_line(self) -> None: |
| invalid_first_line = b"Header will be skipped\r\ni = [1,2,3]\nj = [1,2,3]\n" |
| expected_result = b"Header will be skipped\r\ni = [1, 2, 3]\nj = [1, 2, 3]\n" |
| response = await self.client.post("/", data=invalid_first_line) |
| self.assertEqual(response.status, 400) |
| response = await self.client.post( |
| "/", |
| data=invalid_first_line, |
| headers={blackd.SKIP_SOURCE_FIRST_LINE: "true"}, |
| ) |
| self.assertEqual(response.status, 200) |
| self.assertEqual(await response.read(), expected_result) |
| |
| async def test_blackd_preview(self) -> None: |
| response = await self.client.post( |
| "/", data=b'print("hello")\n', headers={blackd.PREVIEW: "true"} |
| ) |
| self.assertEqual(response.status, 204) |
| |
| async def test_blackd_response_black_version_header(self) -> None: |
| response = await self.client.post("/") |
| self.assertIsNotNone(response.headers.get(blackd.BLACK_VERSION_HEADER)) |
| |
| async def test_cors_preflight(self) -> None: |
| response = await self.client.options( |
| "/", |
| headers={ |
| "Access-Control-Request-Method": "POST", |
| "Origin": "https://example.com", |
| "Access-Control-Request-Headers": "Content-Type", |
| }, |
| ) |
| self.assertEqual(response.status, 403) |
| self.assertIsNone(response.headers.get("Access-Control-Allow-Origin")) |
| self.assertIsNone(response.headers.get("Access-Control-Allow-Headers")) |
| self.assertIsNone(response.headers.get("Access-Control-Allow-Methods")) |
| |
| async def test_cors_requests_rejected_by_default(self) -> None: |
| response = await self.client.post( |
| "/", headers={"Origin": "https://example.com"} |
| ) |
| self.assertEqual(response.status, 403) |
| self.assertIsNone(response.headers.get("Access-Control-Allow-Origin")) |
| self.assertIsNone(response.headers.get("Access-Control-Expose-Headers")) |
| |
| async def test_blackd_format_code_limits_executor_queue(self) -> None: |
| started = Event() |
| release = Event() |
| calls: list[str] = [] |
| executor = ThreadPoolExecutor(max_workers=2) |
| executor_semaphore = asyncio.BoundedSemaphore(1) |
| |
| def slow_format(src: str, *, fast: bool, mode: black.Mode) -> str: |
| calls.append(src) |
| started.set() |
| release.wait(timeout=1) |
| return src |
| |
| try: |
| with patch("blackd.black.format_file_contents", side_effect=slow_format): |
| task1 = asyncio.create_task( |
| blackd.format_code( |
| req_str="print(1)\n", |
| fast=False, |
| mode=black.Mode(), |
| then=datetime.now(timezone.utc), |
| only_diff=False, |
| executor=executor, |
| executor_semaphore=executor_semaphore, |
| ) |
| ) |
| |
| for _ in range(20): |
| if started.is_set(): |
| break |
| await asyncio.sleep(0.01) |
| self.assertTrue(started.is_set()) |
| |
| task2 = asyncio.create_task( |
| blackd.format_code( |
| req_str="print(2)\n", |
| fast=False, |
| mode=black.Mode(), |
| then=datetime.now(timezone.utc), |
| only_diff=False, |
| executor=executor, |
| executor_semaphore=executor_semaphore, |
| ) |
| ) |
| await asyncio.sleep(0.05) |
| self.assertEqual(len(calls), 1) |
| |
| release.set() |
| self.assertEqual(await task1, "print(1)\n") |
| self.assertEqual(await task2, "print(2)\n") |
| self.assertEqual(len(calls), 2) |
| finally: |
| executor.shutdown(wait=True) |
| |
| async def test_preserves_line_endings(self) -> None: |
| for data in (b"c\r\nc\r\n", b"l\nl\n"): |
| # test preserved newlines when reformatted |
| response = await self.client.post("/", data=data + b" ") |
| self.assertEqual(await response.text(), data.decode()) |
| # test 204 when no change |
| response = await self.client.post("/", data=data) |
| self.assertEqual(response.status, 204) |
| |
| async def test_normalizes_line_endings(self) -> None: |
| for data, expected in ((b"c\r\nc\n", "c\r\nc\r\n"), (b"l\nl\r\n", "l\nl\n")): |
| response = await self.client.post("/", data=data) |
| self.assertEqual(await response.text(), expected) |
| self.assertEqual(response.status, 200) |
| |
| async def test_single_character(self) -> None: |
| response = await self.client.post("/", data="1") |
| self.assertEqual(await response.text(), "1\n") |
| self.assertEqual(response.status, 200) |
| |
| |
| @pytest.mark.blackd |
| class BlackDConfiguredCorsTestCase(AioHTTPTestCase): |
| def tearDown(self) -> None: |
| gc.collect() |
| super().tearDown() |
| |
| async def get_application(self) -> web.Application: |
| return blackd.make_app(cors_allow_origins=("https://example.com",)) |
| |
| async def test_cors_preflight(self) -> None: |
| response = await self.client.options( |
| "/", |
| headers={ |
| "Access-Control-Request-Method": "POST", |
| "Origin": "https://example.com", |
| "Access-Control-Request-Headers": "Content-Type", |
| }, |
| ) |
| self.assertEqual(response.status, 200) |
| self.assertEqual( |
| response.headers.get("Access-Control-Allow-Origin"), |
| "https://example.com", |
| ) |
| self.assertIsNotNone(response.headers.get("Access-Control-Allow-Headers")) |
| self.assertIsNotNone(response.headers.get("Access-Control-Allow-Methods")) |
| |
| async def test_cors_headers_present(self) -> None: |
| response = await self.client.post( |
| "/", headers={"Origin": "https://example.com"} |
| ) |
| self.assertEqual( |
| response.headers.get("Access-Control-Allow-Origin"), |
| "https://example.com", |
| ) |
| self.assertIn( |
| blackd.BLACK_VERSION_HEADER, |
| response.headers.get("Access-Control-Expose-Headers", ""), |
| ) |
| |
| |
| @pytest.mark.blackd |
| class BlackDSmallBodyLimitTestCase(AioHTTPTestCase): |
| def tearDown(self) -> None: |
| gc.collect() |
| super().tearDown() |
| |
| async def get_application(self) -> web.Application: |
| return blackd.make_app(max_body_size=16) |
| |
| async def test_blackd_rejects_large_request_body(self) -> None: |
| response = await self.client.post("/", data=b"x" * 17) |
| self.assertEqual(response.status, 413) |
| |
| |
| @pytest.mark.blackd |
| class BlackDClientTestCase(AioHTTPTestCase): |
| def tearDown(self) -> None: |
| # Work around https://github.com/python/cpython/issues/124706 |
| gc.collect() |
| super().tearDown() |
| |
| async def get_application(self) -> web.Application: |
| return blackd.make_app() |
| |
| async def test_unformatted_code(self) -> None: |
| client = blackd.client.BlackDClient(self.client.make_url("/")) |
| unformatted_code = "def hello(): print('Hello, World!')" |
| expected = 'def hello():\n print("Hello, World!")\n' |
| formatted_code = await client.format_code(unformatted_code) |
| |
| self.assertEqual(formatted_code, expected) |
| |
| async def test_formatted_code(self) -> None: |
| client = blackd.client.BlackDClient(self.client.make_url("/")) |
| initial_code = 'def hello():\n print("Hello, World!")\n' |
| expected = 'def hello():\n print("Hello, World!")\n' |
| formatted_code = await client.format_code(initial_code) |
| |
| self.assertEqual(formatted_code, expected) |
| |
| async def test_line_length(self) -> None: |
| client = blackd.client.BlackDClient(self.client.make_url("/"), line_length=10) |
| unformatted_code = "def hello(): print('Hello, World!')" |
| expected = 'def hello():\n print(\n "Hello, World!"\n )\n' |
| formatted_code = await client.format_code(unformatted_code) |
| |
| self.assertEqual(formatted_code, expected) |
| |
| async def test_skip_source_first_line(self) -> None: |
| client = blackd.client.BlackDClient( |
| self.client.make_url("/"), skip_source_first_line=True |
| ) |
| invalid_first_line = "Header will be skipped\r\ni = [1,2,3]\nj = [1,2,3]\n" |
| expected_result = "Header will be skipped\r\ni = [1, 2, 3]\nj = [1, 2, 3]\n" |
| formatted_code = await client.format_code(invalid_first_line) |
| |
| self.assertEqual(formatted_code, expected_result) |
| |
| async def test_skip_string_normalization(self) -> None: |
| client = blackd.client.BlackDClient( |
| self.client.make_url("/"), skip_string_normalization=True |
| ) |
| unformatted_code = "def hello(): print('Hello, World!')" |
| expected = "def hello():\n print('Hello, World!')\n" |
| formatted_code = await client.format_code(unformatted_code) |
| |
| self.assertEqual(formatted_code, expected) |
| |
| async def test_skip_magic_trailing_comma(self) -> None: |
| client = blackd.client.BlackDClient( |
| self.client.make_url("/"), skip_magic_trailing_comma=True |
| ) |
| unformatted_code = "def hello(): print('Hello, World!')" |
| expected = 'def hello():\n print("Hello, World!")\n' |
| formatted_code = await client.format_code(unformatted_code) |
| |
| self.assertEqual(formatted_code, expected) |
| |
| async def test_preview(self) -> None: |
| client = blackd.client.BlackDClient(self.client.make_url("/"), preview=True) |
| unformatted_code = "def hello(): print('Hello, World!')" |
| expected = 'def hello():\n print("Hello, World!")\n' |
| formatted_code = await client.format_code(unformatted_code) |
| |
| self.assertEqual(formatted_code, expected) |
| |
| async def test_fast(self) -> None: |
| client = blackd.client.BlackDClient(self.client.make_url("/"), fast=True) |
| unformatted_code = "def hello(): print('Hello, World!')" |
| expected = 'def hello():\n print("Hello, World!")\n' |
| formatted_code = await client.format_code(unformatted_code) |
| |
| self.assertEqual(formatted_code, expected) |
| |
| async def test_python_variant(self) -> None: |
| client = blackd.client.BlackDClient( |
| self.client.make_url("/"), python_variant="3.6" |
| ) |
| unformatted_code = "def hello(): print('Hello, World!')" |
| expected = 'def hello():\n print("Hello, World!")\n' |
| formatted_code = await client.format_code(unformatted_code) |
| |
| self.assertEqual(formatted_code, expected) |
| |
| async def test_diff(self) -> None: |
| diff_header = re.compile( |
| r"(In|Out)\t\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d\.\d\d\d\d\d\d\+\d\d:\d\d" |
| ) |
| |
| client = blackd.client.BlackDClient(self.client.make_url("/"), diff=True) |
| source, _ = read_data("miscellaneous", "blackd_diff") |
| expected, _ = read_data("miscellaneous", "blackd_diff.diff") |
| |
| diff = await client.format_code(source) |
| diff = diff_header.sub(DETERMINISTIC_HEADER, diff) |
| |
| self.assertEqual(diff, expected) |
| |
| async def test_syntax_error(self) -> None: |
| client = blackd.client.BlackDClient(self.client.make_url("/")) |
| with_syntax_error = "def hello(): a 'Hello, World!'" |
| with self.assertRaises(black.InvalidInput): |
| _ = await client.format_code(with_syntax_error) |