| [case testLibrtStrings_librt] |
| from typing import Any |
| import base64 |
| import binascii |
| import random |
| import struct |
| |
| from librt.strings import BytesWriter, StringWriter, write_i16_le, write_i16_be, read_i16_le, read_i16_be, write_i32_le, write_i32_be, read_i32_le, read_i32_be, write_i64_le, write_i64_be, read_i64_le, read_i64_be, write_f64_le, read_f64_le, write_f64_be, read_f64_be, write_f32_le, write_f32_be, read_f32_le, read_f32_be |
| |
| from testutil import assertRaises |
| |
| # Test values for i16 write/read operations, organized by byte size |
| I16_TEST_VALUES: list[int] = [ |
| # 1-byte values (0 to 127, and -128 to -1 in terms of low byte pattern) |
| 0, 1, 127, -1, -113, -128, # -113 is mypyc overlapping error value |
| # 2-byte values (128 to 32767, -32768 to -129) |
| 128, 255, 256, 0x1234, 32767, -129, -256, -32768, |
| ] |
| |
| # Test values for i32 write/read operations: i16 test values plus 3-4 byte values |
| I32_TEST_VALUES: list[int] = I16_TEST_VALUES + [ |
| # 3-byte values (32768 to 8388607, -8388608 to -32769) |
| 32768, 65535, 65536, 8388607, -32769, -65536, -8388608, |
| # 4-byte values (8388608 to 2147483647, -2147483648 to -8388609) |
| 8388608, 16777215, 16777216, 2147483647, -8388609, -16777216, -2147483648, |
| ] |
| |
| # Test values for i64 write/read operations: i32 test values plus 5-8 byte values |
| I64_TEST_VALUES: list[int] = I32_TEST_VALUES + [ |
| # 5-8 byte values (beyond i32 range) |
| 2147483648, # 2^31, just above i32 max |
| 4294967295, # 2^32 - 1 |
| 4294967296, # 2^32 |
| 1099511627775, # 2^40 - 1 |
| 281474976710655, # 2^48 - 1 |
| 72057594037927935, # 2^56 - 1 |
| 9223372036854775807, # max i64 |
| -2147483649, # just below i32 min |
| -4294967296, # -2^32 |
| -9223372036854775808, # min i64 |
| ] |
| |
| # Test values for f64 write/read operations |
| F64_TEST_VALUES: list[float] = [ |
| 0.0, -0.0, 1.0, -1.0, |
| -113.0, # mypyc overlapping error value |
| 0.5, -0.5, 0.1, -0.1, |
| 1.5, 255.0, 256.0, |
| 1e10, -1e10, 1e100, -1e100, |
| 1e-10, 1e-100, |
| 1.7976931348623157e+308, # max float |
| -1.7976931348623157e+308, # min float |
| 2.2250738585072014e-308, # min positive normal |
| 5e-324, # min positive subnormal |
| float('inf'), float('-inf'), |
| float('nan'), |
| ] |
| |
| # Test values for f32 write/read operations (values that fit in 32-bit float) |
| F32_TEST_VALUES: list[float] = [ |
| 0.0, -0.0, 1.0, -1.0, |
| -113.0, # mypyc overlapping error value |
| 0.5, -0.5, 0.1, -0.1, |
| 1.5, 255.0, 256.0, |
| 1e10, -1e10, |
| 1e-10, |
| 3.4028235e+38, # max float32 |
| -3.4028235e+38, # min float32 |
| 1.1754944e-38, # min positive normal float32 |
| float('inf'), float('-inf'), |
| float('nan'), |
| ] |
| |
| def test_bytes_writer_basics() -> None: |
| w = BytesWriter() |
| assert w.getvalue() == b"" |
| assert len(w) == 0 |
| assert repr(w) == "BytesWriter(b'')" |
| |
| w = BytesWriter() |
| w.append(ord('a')) |
| w.write(b'bc') |
| assert w.getvalue() == b"abc" |
| assert repr(w) == "BytesWriter(b'abc')" |
| |
| def test_bytes_writer_get_item() -> None: |
| w = BytesWriter() |
| w.write(b"foobar") |
| assert w[0] == ord("f") |
| assert w[5] == ord("r") |
| assert w[-1] == ord("r") |
| assert w[-2] == ord("a") |
| assert w[-6] == ord("f") |
| |
| with assertRaises(IndexError): |
| w[6] |
| with assertRaises(IndexError): |
| w[-7] |
| with assertRaises(IndexError): |
| w[1 << 50] |
| with assertRaises(IndexError): |
| w[-(1 << 50)] |
| |
| def test_bytes_writer_set_item() -> None: |
| w = BytesWriter() |
| w.write(b"foobar") |
| w[0] = 255 |
| assert w[0] == 255 |
| w[5] = 0 |
| assert w[5] == 0 |
| assert w.getvalue() == b"\xffooba\x00" |
| |
| with assertRaises(IndexError): |
| w[6] = 0 |
| with assertRaises(IndexError): |
| w[-7] = 0 |
| with assertRaises(IndexError): |
| w[1 << 50] = 0 |
| with assertRaises(IndexError): |
| w[-(1 << 50)] = 0 |
| |
| with assertRaises(ValueError): |
| w[0] = int() - 1 |
| with assertRaises(ValueError): |
| w[0] = int() + 256 |
| |
| # Grow BytesWriter |
| w.write(b"xy" * 512) |
| w[1024 + 5] = 66 |
| assert w[1024 + 5] == 66 |
| assert w[0] == 255 |
| |
| def test_bytes_writer_append_grow() -> None: |
| w = BytesWriter() |
| for i in range(16384): |
| w.append((i ^ (i >> 8)) & 255) |
| assert len(w) == i + 1 |
| b = w.getvalue() |
| for i in range(16384): |
| assert b[i] == (i ^ (i >> 8)) & 255 |
| |
| def test_bytes_writer_write_grow() -> None: |
| w = BytesWriter() |
| for i in range(16384): |
| w.write(bytes([(i ^ (i >> 8)) & 255])) |
| b = w.getvalue() |
| for i in range(16384): |
| assert b[i] == (i ^ (i >> 8)) & 255 |
| assert b[i] == w[i] |
| |
| w = BytesWriter() |
| a = [] |
| for i in range(16384): |
| w.write(bytes()) |
| if i & 1 == 0: |
| segment = b"foobarz" |
| else: |
| segment = b"\x7f\x00ab!" |
| w.write(segment) |
| a.append(segment) |
| assert w.getvalue() == b"".join(a) |
| |
| def test_bytes_writer_truncate() -> None: |
| b = BytesWriter() |
| b.write(b"foobar") |
| b.truncate(6) # No-op |
| assert len(b) == 6 |
| assert b.getvalue() == b"foobar" |
| b.truncate(5) |
| assert len(b) == 5 |
| assert b.getvalue() == b"fooba" |
| b.truncate(0) |
| assert len(b) == 0 |
| assert b.getvalue() == b"" |
| |
| with assertRaises(ValueError): |
| b.truncate(1) |
| |
| b = BytesWriter() |
| b.write(b"foobar") |
| with assertRaises(ValueError): |
| b.truncate(-1) |
| |
| def test_bytes_writer_write_i16_le() -> None: |
| # Test various i16 values from 1-byte to 2-byte range |
| w = BytesWriter() |
| for v in I16_TEST_VALUES: |
| write_i16_le(w, v) |
| assert w.getvalue() == b"".join(struct.pack("<h", v) for v in I16_TEST_VALUES) |
| |
| # Test mixing with other operations and buffer growth |
| w = BytesWriter() |
| w.append(0xFF) |
| for i in range(1000): |
| write_i16_le(w, i) |
| w.append(0xEE) |
| result = w.getvalue() |
| assert len(result) == 2002 |
| assert result[0] == 0xFF |
| assert result[-1] == 0xEE |
| # Check a few values in the middle |
| assert result[1:3] == b"\x00\x00" # i=0 |
| assert result[3:5] == b"\x01\x00" # i=1 |
| assert result[1999:2001] == b"\xe7\x03" # i=999 |
| |
| def test_write_i16_via_any() -> None: |
| # Test write_i16_le/be via Any to ensure C extension wrapper works |
| # (tests fallback path when not using mypyc primitives) |
| for write_func, fmt in zip((write_i16_le, write_i16_be), ("<h", ">h")): |
| w: Any = BytesWriter() |
| |
| # Test 8-bit and 16-bit operations |
| w.append(0x42) |
| write_func(w, 0x1234) |
| w.append(0xFF) |
| assert w.getvalue() == b"\x42" + struct.pack(fmt, 0x1234) + b"\xFF" |
| |
| # Test buffer growth |
| w2: Any = BytesWriter() |
| for i in range(150): |
| write_func(w2, i) |
| result = w2.getvalue() |
| assert len(result) == 300 |
| assert result[0:2] == struct.pack(fmt, 0) # i=0 |
| assert result[2:4] == struct.pack(fmt, 1) # i=1 |
| assert result[298:300] == struct.pack(fmt, 149) # i=149 |
| |
| # Test values that don't fit in i16 |
| w3: Any = BytesWriter() |
| with assertRaises(ValueError, "int too large to convert to i16"): |
| write_func(w3, 32768 + int()) |
| with assertRaises(ValueError, "int too large to convert to i16"): |
| write_func(w3, -32769 + int()) |
| with assertRaises(ValueError, "int too large to convert to i16"): |
| write_func(w3, 100000 + int()) |
| |
| def test_bytes_reader_read_i16_le() -> None: |
| # Test various i16 values from 1-byte to 2-byte range |
| data = b"".join(struct.pack("<h", v) for v in I16_TEST_VALUES) |
| for i, v in enumerate(I16_TEST_VALUES): |
| assert read_i16_le(data, i * 2) == v |
| |
| # Test round-trip with write_i16_le |
| w = BytesWriter() |
| for v in I16_TEST_VALUES: |
| write_i16_le(w, v) |
| result = w.getvalue() |
| for i, v in enumerate(I16_TEST_VALUES): |
| assert read_i16_le(result, i * 2) == v |
| |
| # Test unaligned offset |
| data2 = b"\xFF" + struct.pack("<h", 0x1234) + b"\xFF" |
| assert read_i16_le(data2, 1) == 0x1234 |
| |
| # Test barely out of bounds (only 1 byte available, need 2) |
| with assertRaises(IndexError): |
| read_i16_le(data, len(data) - 1) |
| |
| def test_bytes_reader_read_i16_be() -> None: |
| # Test various i16 values from 1-byte to 2-byte range |
| data = b"".join(struct.pack(">h", v) for v in I16_TEST_VALUES) |
| for i, v in enumerate(I16_TEST_VALUES): |
| assert read_i16_be(data, i * 2) == v |
| |
| # Test round-trip with write_i16_be |
| w = BytesWriter() |
| for v in I16_TEST_VALUES: |
| write_i16_be(w, v) |
| result = w.getvalue() |
| for i, v in enumerate(I16_TEST_VALUES): |
| assert read_i16_be(result, i * 2) == v |
| |
| # Test unaligned offset |
| data2 = b"\xFF" + struct.pack(">h", 0x1234) + b"\xFF" |
| assert read_i16_be(data2, 1) == 0x1234 |
| |
| # Test barely out of bounds (only 1 byte available, need 2) |
| with assertRaises(IndexError): |
| read_i16_be(data, len(data) - 1) |
| |
| def test_read_i16_via_any() -> None: |
| # Test read_i16_le/be via Any to ensure C extension wrapper works |
| for read_func, fmt in zip((read_i16_le, read_i16_be), ("<h", ">h")): |
| data: Any = struct.pack(fmt, 0x1234) + struct.pack(fmt, -1) + struct.pack(fmt, 0) |
| assert read_func(data, 0) == 0x1234 |
| assert read_func(data, 2) == -1 |
| assert read_func(data, 4) == 0 |
| |
| # Test error cases |
| # Index out of range |
| with assertRaises(IndexError, "index 10 out of range for bytes of length 6"): |
| read_func(data, 10 + int()) |
| with assertRaises(IndexError, "index 5 out of range for bytes of length 6"): |
| read_func(data, 5 + int()) # Not enough bytes for i16 |
| |
| # Negative index |
| with assertRaises(ValueError, "index must be non-negative"): |
| read_func(data, -1 + int()) |
| |
| # Wrong type for bytes argument |
| with assertRaises(TypeError): |
| bad: Any = "not bytes" |
| read_func(bad, 0 + int()) |
| with assertRaises(TypeError): |
| bad2: Any = bytearray(b"\x00\x00") |
| read_func(bad2, 0 + int()) |
| |
| def test_bytes_writer_write_i16_be() -> None: |
| # Test various i16 values from 1-byte to 2-byte range |
| w = BytesWriter() |
| for v in I16_TEST_VALUES: |
| write_i16_be(w, v) |
| assert w.getvalue() == b"".join(struct.pack(">h", v) for v in I16_TEST_VALUES) |
| |
| # Test mixing with other operations and buffer growth |
| w = BytesWriter() |
| w.append(0xFF) |
| for i in range(1000): |
| write_i16_be(w, i) |
| w.append(0xEE) |
| result = w.getvalue() |
| assert len(result) == 2002 |
| assert result[0] == 0xFF |
| assert result[-1] == 0xEE |
| # Check a few values in the middle |
| assert result[1:3] == b"\x00\x00" # i=0 |
| assert result[3:5] == b"\x00\x01" # i=1 |
| assert result[1999:2001] == b"\x03\xe7" # i=999 |
| |
| def test_bytes_writer_write_i32_le() -> None: |
| # Test various i32 values from 1-byte to 4-byte range |
| w = BytesWriter() |
| for v in I32_TEST_VALUES: |
| write_i32_le(w, v) |
| assert w.getvalue() == b"".join(struct.pack("<i", v) for v in I32_TEST_VALUES) |
| |
| # Test mixing with other operations and buffer growth |
| w = BytesWriter() |
| w.append(0xFF) |
| for i in range(500): |
| write_i32_le(w, i * 1000) |
| w.append(0xEE) |
| result = w.getvalue() |
| assert len(result) == 2002 |
| assert result[0] == 0xFF |
| assert result[-1] == 0xEE |
| # Check a few values in the middle |
| assert result[1:5] == struct.pack("<i", 0) |
| assert result[5:9] == struct.pack("<i", 1000) |
| assert result[1997:2001] == struct.pack("<i", 499000) |
| |
| def test_bytes_writer_write_i32_be() -> None: |
| # Test various i32 values from 1-byte to 4-byte range |
| w = BytesWriter() |
| for v in I32_TEST_VALUES: |
| write_i32_be(w, v) |
| assert w.getvalue() == b"".join(struct.pack(">i", v) for v in I32_TEST_VALUES) |
| |
| # Test mixing with other operations and buffer growth |
| w = BytesWriter() |
| w.append(0xFF) |
| for i in range(500): |
| write_i32_be(w, i * 1000) |
| w.append(0xEE) |
| result = w.getvalue() |
| assert len(result) == 2002 |
| assert result[0] == 0xFF |
| assert result[-1] == 0xEE |
| # Check a few values in the middle |
| assert result[1:5] == struct.pack(">i", 0) |
| assert result[5:9] == struct.pack(">i", 1000) |
| assert result[1997:2001] == struct.pack(">i", 499000) |
| |
| def test_write_i32_via_any() -> None: |
| # Test write_i32_le/be via Any to ensure C extension wrapper works |
| # (tests fallback path when not using mypyc primitives) |
| for write_func, fmt in zip((write_i32_le, write_i32_be), ("<i", ">i")): |
| w: Any = BytesWriter() |
| |
| # Test 8-bit and 32-bit operations |
| w.append(0x42) |
| write_func(w, 0x12345678) |
| w.append(0xFF) |
| assert w.getvalue() == b"\x42" + struct.pack(fmt, 0x12345678) + b"\xFF" |
| |
| # Test buffer growth |
| w2: Any = BytesWriter() |
| for i in range(100): |
| write_func(w2, i * 10000) |
| result = w2.getvalue() |
| assert len(result) == 400 |
| assert result[0:4] == struct.pack(fmt, 0) |
| assert result[4:8] == struct.pack(fmt, 10000) |
| assert result[396:400] == struct.pack(fmt, 990000) |
| |
| # Test values that don't fit in i32 |
| w3: Any = BytesWriter() |
| with assertRaises(ValueError, "int too large to convert to i32"): |
| write_func(w3, 2147483648 + int()) |
| with assertRaises(ValueError, "int too large to convert to i32"): |
| write_func(w3, -2147483649 + int()) |
| with assertRaises(ValueError, "int too large to convert to i32"): |
| write_func(w3, 10000000000 + int()) |
| |
| def test_bytes_reader_read_i32_le() -> None: |
| # Test various i32 values from 1-byte to 4-byte range |
| data = b"".join(struct.pack("<i", v) for v in I32_TEST_VALUES) |
| for i, v in enumerate(I32_TEST_VALUES): |
| assert read_i32_le(data, i * 4) == v |
| |
| # Test round-trip with write_i32_le |
| w = BytesWriter() |
| for v in I32_TEST_VALUES: |
| write_i32_le(w, v) |
| result = w.getvalue() |
| for i, v in enumerate(I32_TEST_VALUES): |
| assert read_i32_le(result, i * 4) == v |
| |
| # Test unaligned offset |
| data2 = b"\xFF" + struct.pack("<i", 0x12345678) + b"\xFF" |
| assert read_i32_le(data2, 1) == 0x12345678 |
| |
| # Test barely out of bounds (only 3 bytes available, need 4) |
| with assertRaises(IndexError): |
| read_i32_le(data, len(data) - 3) |
| |
| def test_bytes_reader_read_i32_be() -> None: |
| # Test various i32 values from 1-byte to 4-byte range |
| data = b"".join(struct.pack(">i", v) for v in I32_TEST_VALUES) |
| for i, v in enumerate(I32_TEST_VALUES): |
| assert read_i32_be(data, i * 4) == v |
| |
| # Test round-trip with write_i32_be |
| w = BytesWriter() |
| for v in I32_TEST_VALUES: |
| write_i32_be(w, v) |
| result = w.getvalue() |
| for i, v in enumerate(I32_TEST_VALUES): |
| assert read_i32_be(result, i * 4) == v |
| |
| # Test unaligned offset |
| data2 = b"\xFF" + struct.pack(">i", 0x12345678) + b"\xFF" |
| assert read_i32_be(data2, 1) == 0x12345678 |
| |
| # Test barely out of bounds (only 3 bytes available, need 4) |
| with assertRaises(IndexError): |
| read_i32_be(data, len(data) - 3) |
| |
| def test_read_i32_via_any() -> None: |
| # Test read_i32_le/be via Any to ensure C extension wrapper works |
| for read_func, fmt in zip((read_i32_le, read_i32_be), ("<i", ">i")): |
| data: Any = struct.pack(fmt, 0x12345678) + struct.pack(fmt, -1) + struct.pack(fmt, 0) |
| assert read_func(data, 0) == 0x12345678 |
| assert read_func(data, 4) == -1 |
| assert read_func(data, 8) == 0 |
| |
| # Test error cases |
| # Index out of range |
| with assertRaises(IndexError, "index 20 out of range for bytes of length 12"): |
| read_func(data, 20 + int()) |
| with assertRaises(IndexError, "index 9 out of range for bytes of length 12"): |
| read_func(data, 9 + int()) # Not enough bytes for i32 |
| |
| # Negative index |
| with assertRaises(ValueError, "index must be non-negative"): |
| read_func(data, -1 + int()) |
| |
| # Wrong type for bytes argument |
| with assertRaises(TypeError): |
| bad: Any = "not bytes" |
| read_func(bad, 0 + int()) |
| with assertRaises(TypeError): |
| bad2: Any = bytearray(b"\x00\x00\x00\x00") |
| read_func(bad2, 0 + int()) |
| |
| def test_bytes_writer_write_i64_le() -> None: |
| # Test all i64 values (includes all i32 values plus 5-8 byte values) |
| w = BytesWriter() |
| for v in I64_TEST_VALUES: |
| write_i64_le(w, v) |
| assert w.getvalue() == b"".join(struct.pack("<q", v) for v in I64_TEST_VALUES) |
| |
| # Test mixing with other operations and buffer growth |
| w = BytesWriter() |
| w.append(0xFF) |
| for i in range(200): |
| write_i64_le(w, i * 1000000000) |
| w.append(0xEE) |
| result = w.getvalue() |
| assert len(result) == 1602 |
| assert result[0] == 0xFF |
| assert result[-1] == 0xEE |
| assert result[1:9] == struct.pack("<q", 0) |
| assert result[9:17] == struct.pack("<q", 1000000000) |
| assert result[1593:1601] == struct.pack("<q", 199000000000) |
| |
| def test_bytes_writer_write_i64_be() -> None: |
| # Test all i64 values (includes all i32 values plus 5-8 byte values) |
| w = BytesWriter() |
| for v in I64_TEST_VALUES: |
| write_i64_be(w, v) |
| assert w.getvalue() == b"".join(struct.pack(">q", v) for v in I64_TEST_VALUES) |
| |
| # Test mixing with other operations and buffer growth |
| w = BytesWriter() |
| w.append(0xFF) |
| for i in range(200): |
| write_i64_be(w, i * 1000000000) |
| w.append(0xEE) |
| result = w.getvalue() |
| assert len(result) == 1602 |
| assert result[0] == 0xFF |
| assert result[-1] == 0xEE |
| assert result[1:9] == struct.pack(">q", 0) |
| assert result[9:17] == struct.pack(">q", 1000000000) |
| assert result[1593:1601] == struct.pack(">q", 199000000000) |
| |
| def test_write_i64_via_any() -> None: |
| # Test write_i64_le/be via Any to ensure C extension wrapper works |
| for write_func, fmt in zip((write_i64_le, write_i64_be), ("<q", ">q")): |
| w: Any = BytesWriter() |
| |
| w.append(0x42) |
| write_func(w, 0x123456789ABCDEF0) |
| w.append(0xFF) |
| assert w.getvalue() == b"\x42" + struct.pack(fmt, 0x123456789ABCDEF0) + b"\xFF" |
| |
| # Test buffer growth |
| w2: Any = BytesWriter() |
| for i in range(50): |
| write_func(w2, i * 10000000000) |
| result = w2.getvalue() |
| assert len(result) == 400 |
| assert result[0:8] == struct.pack(fmt, 0) |
| assert result[8:16] == struct.pack(fmt, 10000000000) |
| assert result[392:400] == struct.pack(fmt, 490000000000) |
| |
| def test_bytes_reader_read_i64_le() -> None: |
| # Test all i64 values (includes all i32 values plus 5-8 byte values) |
| data = b"".join(struct.pack("<q", v) for v in I64_TEST_VALUES) |
| for i, v in enumerate(I64_TEST_VALUES): |
| assert read_i64_le(data, i * 8) == v |
| |
| # Test round-trip with write_i64_le |
| w = BytesWriter() |
| for v in I64_TEST_VALUES: |
| write_i64_le(w, v) |
| result = w.getvalue() |
| for i, v in enumerate(I64_TEST_VALUES): |
| assert read_i64_le(result, i * 8) == v |
| |
| # Test unaligned offset |
| data2 = b"\xFF" + struct.pack("<q", 0x123456789ABCDEF0) + b"\xFF" |
| assert read_i64_le(data2, 1) == 0x123456789ABCDEF0 |
| |
| # Test barely out of bounds (only 7 bytes available, need 8) |
| with assertRaises(IndexError): |
| read_i64_le(data, len(data) - 7) |
| |
| def test_bytes_reader_read_i64_be() -> None: |
| # Test all i64 values (includes all i32 values plus 5-8 byte values) |
| data = b"".join(struct.pack(">q", v) for v in I64_TEST_VALUES) |
| for i, v in enumerate(I64_TEST_VALUES): |
| assert read_i64_be(data, i * 8) == v |
| |
| # Test round-trip with write_i64_be |
| w = BytesWriter() |
| for v in I64_TEST_VALUES: |
| write_i64_be(w, v) |
| result = w.getvalue() |
| for i, v in enumerate(I64_TEST_VALUES): |
| assert read_i64_be(result, i * 8) == v |
| |
| # Test unaligned offset |
| data2 = b"\xFF" + struct.pack(">q", 0x123456789ABCDEF0) + b"\xFF" |
| assert read_i64_be(data2, 1) == 0x123456789ABCDEF0 |
| |
| # Test barely out of bounds (only 7 bytes available, need 8) |
| with assertRaises(IndexError): |
| read_i64_be(data, len(data) - 7) |
| |
| def test_read_i64_via_any() -> None: |
| # Test read_i64_le/be via Any to ensure C extension wrapper works |
| for read_func, fmt in zip((read_i64_le, read_i64_be), ("<q", ">q")): |
| data: Any = struct.pack(fmt, 0x123456789ABCDEF0) + struct.pack(fmt, -1) + struct.pack(fmt, 0) |
| assert read_func(data, 0) == 0x123456789ABCDEF0 |
| assert read_func(data, 8) == -1 |
| assert read_func(data, 16) == 0 |
| |
| # Test error cases |
| # Index out of range |
| with assertRaises(IndexError, "index 30 out of range for bytes of length 24"): |
| read_func(data, 30 + int()) |
| with assertRaises(IndexError, "index 17 out of range for bytes of length 24"): |
| read_func(data, 17 + int()) # Not enough bytes for i64 |
| |
| # Negative index |
| with assertRaises(ValueError, "index must be non-negative"): |
| read_func(data, -1 + int()) |
| |
| # Wrong type for bytes argument |
| with assertRaises(TypeError): |
| bad: Any = "not bytes" |
| read_func(bad, 0 + int()) |
| with assertRaises(TypeError): |
| bad2: Any = bytearray(b"\x00" * 8) |
| read_func(bad2, 0 + int()) |
| |
| def test_bytes_writer_write_f64_le() -> None: |
| # Test various f64 values |
| w = BytesWriter() |
| for v in F64_TEST_VALUES: |
| write_f64_le(w, v) |
| result = w.getvalue() |
| expected = b"".join(struct.pack("<d", v) for v in F64_TEST_VALUES) |
| # Compare byte-by-byte (NaN != NaN but bytes should match) |
| assert result == expected |
| |
| # Test mixing with other operations and buffer growth |
| w = BytesWriter() |
| w.append(0xFF) |
| for i in range(200): |
| write_f64_le(w, float(i) * 1.5) |
| w.append(0xEE) |
| result = w.getvalue() |
| assert len(result) == 1602 |
| assert result[0] == 0xFF |
| assert result[-1] == 0xEE |
| assert result[1:9] == struct.pack("<d", 0.0) |
| assert result[9:17] == struct.pack("<d", 1.5) |
| assert result[1593:1601] == struct.pack("<d", 199.0 * 1.5) |
| |
| def test_bytes_writer_write_f64_be() -> None: |
| # Test various f64 values |
| w = BytesWriter() |
| for v in F64_TEST_VALUES: |
| write_f64_be(w, v) |
| result = w.getvalue() |
| expected = b"".join(struct.pack(">d", v) for v in F64_TEST_VALUES) |
| # Compare byte-by-byte (NaN != NaN but bytes should match) |
| assert result == expected |
| |
| # Test mixing with other operations and buffer growth |
| w = BytesWriter() |
| w.append(0xFF) |
| for i in range(200): |
| write_f64_be(w, float(i) * 1.5) |
| w.append(0xEE) |
| result = w.getvalue() |
| assert len(result) == 1602 |
| assert result[0] == 0xFF |
| assert result[-1] == 0xEE |
| assert result[1:9] == struct.pack(">d", 0.0) |
| assert result[9:17] == struct.pack(">d", 1.5) |
| assert result[1593:1601] == struct.pack(">d", 199.0 * 1.5) |
| |
| def test_bytes_reader_read_f64_le() -> None: |
| import math |
| # Test various f64 values |
| data = b"".join(struct.pack("<d", v) for v in F64_TEST_VALUES) |
| for i, v in enumerate(F64_TEST_VALUES): |
| result = read_f64_le(data, i * 8) |
| if math.isnan(v): |
| assert math.isnan(result) |
| else: |
| assert result == v |
| |
| # Test round-trip with write_f64_le |
| w = BytesWriter() |
| for v in F64_TEST_VALUES: |
| write_f64_le(w, v) |
| result_bytes = w.getvalue() |
| for i, v in enumerate(F64_TEST_VALUES): |
| result = read_f64_le(result_bytes, i * 8) |
| if math.isnan(v): |
| assert math.isnan(result) |
| else: |
| assert result == v |
| |
| # Test unaligned offset |
| data2 = b"\xFF" + struct.pack("<d", 1.5) + b"\xFF" |
| assert read_f64_le(data2, 1) == 1.5 |
| |
| # Test barely out of bounds (only 7 bytes available, need 8) |
| with assertRaises(IndexError): |
| read_f64_le(data, len(data) - 7) |
| |
| def test_bytes_reader_read_f64_be() -> None: |
| import math |
| # Test various f64 values |
| data = b"".join(struct.pack(">d", v) for v in F64_TEST_VALUES) |
| for i, v in enumerate(F64_TEST_VALUES): |
| result = read_f64_be(data, i * 8) |
| if math.isnan(v): |
| assert math.isnan(result) |
| else: |
| assert result == v |
| |
| # Test round-trip with write_f64_be |
| w = BytesWriter() |
| for v in F64_TEST_VALUES: |
| write_f64_be(w, v) |
| result_bytes = w.getvalue() |
| for i, v in enumerate(F64_TEST_VALUES): |
| result = read_f64_be(result_bytes, i * 8) |
| if math.isnan(v): |
| assert math.isnan(result) |
| else: |
| assert result == v |
| |
| # Test unaligned offset |
| data2 = b"\xFF" + struct.pack(">d", 1.5) + b"\xFF" |
| assert read_f64_be(data2, 1) == 1.5 |
| |
| # Test barely out of bounds (only 7 bytes available, need 8) |
| with assertRaises(IndexError): |
| read_f64_be(data, len(data) - 7) |
| |
| def test_read_f64_via_any() -> None: |
| import math |
| # Test read_f64_le/be via Any to ensure C extension wrapper works |
| for read_func, fmt in zip((read_f64_le, read_f64_be), ("<d", ">d")): |
| data: Any = struct.pack(fmt, 1.5) + struct.pack(fmt, -113.0) + struct.pack(fmt, 0.0) |
| assert read_func(data, 0) == 1.5 |
| assert read_func(data, 8) == -113.0 |
| assert read_func(data, 16) == 0.0 |
| |
| # Test special values |
| data2: Any = struct.pack(fmt, float('inf')) + struct.pack(fmt, float('nan')) |
| assert read_func(data2, 0) == float('inf') |
| assert math.isnan(read_func(data2, 8)) |
| |
| # Test error cases |
| # Index out of range |
| with assertRaises(IndexError, "index 30 out of range for bytes of length 24"): |
| read_func(data, 30 + int()) |
| with assertRaises(IndexError, "index 17 out of range for bytes of length 24"): |
| read_func(data, 17 + int()) # Not enough bytes for f64 |
| |
| # Negative index |
| with assertRaises(ValueError, "index must be non-negative"): |
| read_func(data, -1 + int()) |
| |
| # Wrong type for bytes argument |
| with assertRaises(TypeError): |
| bad: Any = "not bytes" |
| read_func(bad, 0 + int()) |
| with assertRaises(TypeError): |
| bad2: Any = bytearray(b"\x00" * 8) |
| read_func(bad2, 0 + int()) |
| |
| def test_write_f64_via_any() -> None: |
| import math |
| # Test write_f64_le/be via Any to ensure C extension wrapper works |
| for write_func, fmt in zip((write_f64_le, write_f64_be), ("<d", ">d")): |
| w: Any = BytesWriter() |
| |
| w.append(0x42) |
| write_func(w, 1.5) |
| w.append(0xFF) |
| assert w.getvalue() == b"\x42" + struct.pack(fmt, 1.5) + b"\xFF" |
| |
| # Test buffer growth |
| w2: Any = BytesWriter() |
| for i in range(50): |
| write_func(w2, float(i) * 0.25) |
| result = w2.getvalue() |
| assert len(result) == 400 |
| assert result[0:8] == struct.pack(fmt, 0.0) |
| assert result[8:16] == struct.pack(fmt, 0.25) |
| assert result[392:400] == struct.pack(fmt, 49.0 * 0.25) |
| |
| # Test special values |
| w3: Any = BytesWriter() |
| write_func(w3, float('inf')) |
| write_func(w3, float('-inf')) |
| write_func(w3, float('nan')) |
| result = w3.getvalue() |
| assert result[0:8] == struct.pack(fmt, float('inf')) |
| assert result[8:16] == struct.pack(fmt, float('-inf')) |
| assert math.isnan(struct.unpack(fmt, result[16:24])[0]) |
| |
| # Test wrong type |
| w4: Any = BytesWriter() |
| with assertRaises(TypeError): |
| bad: Any = "not a float" + str() |
| write_func(w4, bad) |
| |
| def test_bytes_writer_write_f32_le() -> None: |
| import struct as s |
| # Test various f32 values |
| w = BytesWriter() |
| for v in F32_TEST_VALUES: |
| write_f32_le(w, v) |
| result = w.getvalue() |
| expected = b"".join(s.pack("<f", v) for v in F32_TEST_VALUES) |
| # Compare byte-by-byte (NaN != NaN but bytes should match) |
| assert result == expected |
| |
| # Test mixing with other operations and buffer growth |
| w = BytesWriter() |
| w.append(0xFF) |
| for i in range(200): |
| write_f32_le(w, float(i) * 1.5) |
| w.append(0xEE) |
| result = w.getvalue() |
| assert len(result) == 802 |
| assert result[0] == 0xFF |
| assert result[-1] == 0xEE |
| assert result[1:5] == s.pack("<f", 0.0) |
| assert result[5:9] == s.pack("<f", 1.5) |
| assert result[797:801] == s.pack("<f", 199.0 * 1.5) |
| |
| def test_bytes_writer_write_f32_be() -> None: |
| import struct as s |
| # Test various f32 values |
| w = BytesWriter() |
| for v in F32_TEST_VALUES: |
| write_f32_be(w, v) |
| result = w.getvalue() |
| expected = b"".join(s.pack(">f", v) for v in F32_TEST_VALUES) |
| # Compare byte-by-byte (NaN != NaN but bytes should match) |
| assert result == expected |
| |
| # Test mixing with other operations and buffer growth |
| w = BytesWriter() |
| w.append(0xFF) |
| for i in range(200): |
| write_f32_be(w, float(i) * 1.5) |
| w.append(0xEE) |
| result = w.getvalue() |
| assert len(result) == 802 |
| assert result[0] == 0xFF |
| assert result[-1] == 0xEE |
| assert result[1:5] == s.pack(">f", 0.0) |
| assert result[5:9] == s.pack(">f", 1.5) |
| assert result[797:801] == s.pack(">f", 199.0 * 1.5) |
| |
| def test_write_f32_via_any() -> None: |
| import math |
| # Test write_f32_le/be via Any to ensure C extension wrapper works |
| for write_func, fmt in zip((write_f32_le, write_f32_be), ("<f", ">f")): |
| w: Any = BytesWriter() |
| |
| w.append(0x42) |
| write_func(w, 1.5) |
| w.append(0xFF) |
| assert w.getvalue() == b"\x42" + struct.pack(fmt, 1.5) + b"\xFF" |
| |
| # Test buffer growth |
| w2: Any = BytesWriter() |
| for i in range(100): |
| write_func(w2, float(i) * 0.25) |
| result = w2.getvalue() |
| assert len(result) == 400 |
| assert result[0:4] == struct.pack(fmt, 0.0) |
| assert result[4:8] == struct.pack(fmt, 0.25) |
| assert result[396:400] == struct.pack(fmt, 99.0 * 0.25) |
| |
| # Test special values |
| w3: Any = BytesWriter() |
| write_func(w3, float('inf')) |
| write_func(w3, float('-inf')) |
| write_func(w3, float('nan')) |
| result = w3.getvalue() |
| assert result[0:4] == struct.pack(fmt, float('inf')) |
| assert result[4:8] == struct.pack(fmt, float('-inf')) |
| assert math.isnan(struct.unpack(fmt, result[8:12])[0]) |
| |
| # Test wrong type |
| w4: Any = BytesWriter() |
| with assertRaises(TypeError): |
| bad: Any = "not a float" + str() |
| write_func(w4, bad) |
| |
| def test_bytes_reader_read_f32_le() -> None: |
| import math |
| import struct as s |
| # Test various f32 values (compare against f32-rounded expected value, |
| # since some f64 values like 0.1 lose precision when stored as f32) |
| data = b"".join(s.pack("<f", v) for v in F32_TEST_VALUES) |
| for i, v in enumerate(F32_TEST_VALUES): |
| result = read_f32_le(data, i * 4) |
| expected = s.unpack("<f", s.pack("<f", v))[0] |
| if math.isnan(v): |
| assert math.isnan(result) |
| else: |
| assert result == expected |
| |
| # Test round-trip with write_f32_le |
| w = BytesWriter() |
| for v in F32_TEST_VALUES: |
| write_f32_le(w, v) |
| result_bytes = w.getvalue() |
| for i, v in enumerate(F32_TEST_VALUES): |
| result = read_f32_le(result_bytes, i * 4) |
| expected = s.unpack("<f", s.pack("<f", v))[0] |
| if math.isnan(v): |
| assert math.isnan(result) |
| else: |
| assert result == expected |
| |
| # Test unaligned offset |
| data2 = b"\xFF" + s.pack("<f", 1.5) + b"\xFF" |
| assert read_f32_le(data2, 1) == 1.5 |
| |
| # Test barely out of bounds (only 3 bytes available, need 4) |
| with assertRaises(IndexError): |
| read_f32_le(data, len(data) - 3) |
| |
| def test_bytes_reader_read_f32_be() -> None: |
| import math |
| import struct as s |
| # Test various f32 values (compare against f32-rounded expected value, |
| # since some f64 values like 0.1 lose precision when stored as f32) |
| data = b"".join(s.pack(">f", v) for v in F32_TEST_VALUES) |
| for i, v in enumerate(F32_TEST_VALUES): |
| result = read_f32_be(data, i * 4) |
| expected = s.unpack(">f", s.pack(">f", v))[0] |
| if math.isnan(v): |
| assert math.isnan(result) |
| else: |
| assert result == expected |
| |
| # Test round-trip with write_f32_be |
| w = BytesWriter() |
| for v in F32_TEST_VALUES: |
| write_f32_be(w, v) |
| result_bytes = w.getvalue() |
| for i, v in enumerate(F32_TEST_VALUES): |
| result = read_f32_be(result_bytes, i * 4) |
| expected = s.unpack(">f", s.pack(">f", v))[0] |
| if math.isnan(v): |
| assert math.isnan(result) |
| else: |
| assert result == expected |
| |
| # Test unaligned offset |
| data2 = b"\xFF" + s.pack(">f", 1.5) + b"\xFF" |
| assert read_f32_be(data2, 1) == 1.5 |
| |
| # Test barely out of bounds (only 3 bytes available, need 4) |
| with assertRaises(IndexError): |
| read_f32_be(data, len(data) - 3) |
| |
| def test_read_f32_via_any() -> None: |
| import math |
| # Test read_f32_le/be via Any to ensure C extension wrapper works |
| for read_func, fmt in zip((read_f32_le, read_f32_be), ("<f", ">f")): |
| data: Any = struct.pack(fmt, 1.5) + struct.pack(fmt, -113.0) + struct.pack(fmt, 0.0) |
| assert read_func(data, 0) == 1.5 |
| assert read_func(data, 4) == -113.0 |
| assert read_func(data, 8) == 0.0 |
| |
| # Test special values |
| data2: Any = struct.pack(fmt, float('inf')) + struct.pack(fmt, float('nan')) |
| assert read_func(data2, 0) == float('inf') |
| assert math.isnan(read_func(data2, 4)) |
| |
| # Test error cases |
| # Index out of range |
| with assertRaises(IndexError, "index 20 out of range for bytes of length 12"): |
| read_func(data, 20 + int()) |
| with assertRaises(IndexError, "index 9 out of range for bytes of length 12"): |
| read_func(data, 9 + int()) # Not enough bytes for f32 |
| |
| # Negative index |
| with assertRaises(ValueError, "index must be non-negative"): |
| read_func(data, -1 + int()) |
| |
| # Wrong type for bytes argument |
| with assertRaises(TypeError): |
| bad: Any = "not bytes" |
| read_func(bad, 0 + int()) |
| with assertRaises(TypeError): |
| bad2: Any = bytearray(b"\x00" * 4) |
| read_func(bad2, 0 + int()) |
| |
| def test_write_bytearray() -> None: |
| w = BytesWriter() |
| w.write(bytearray(b"foobar")) |
| w.write(bytearray(b"")) |
| w.write(bytearray(b"\x00\xf8")) |
| assert w.getvalue() == b"foobar\x00\xf8" |
| |
| def test_cast_bytes_writer() -> None: |
| a: Any = BytesWriter() |
| b: BytesWriter = a |
| assert b.getvalue() == b"" |
| |
| a2: Any = "x" |
| with assertRaises(TypeError): |
| b = a2 |
| |
| def test_bytes_writer_wrapper_functions() -> None: |
| cls: Any = BytesWriter |
| b: Any = cls() |
| assert repr(b) == "BytesWriter(b'')" |
| assert len(b) == 0 |
| b.append(ord('a')) |
| b.append(0) |
| b.append(255) |
| b.write(b"foo") |
| b.write(bytearray(b"bar")) |
| assert b.getvalue() == b"a\x00\xfffoobar" |
| assert len(b) == 9 |
| |
| assert b[0] == ord('a') |
| assert b[8] == ord('r') |
| assert b[-1] == ord('r') |
| assert b[-9] == ord('a') |
| |
| b[0] = 215 |
| b[8] = 0 |
| assert b[0] == 215 |
| assert b[8] == 0 |
| b[-1] = 1 |
| assert b[8] == 1 |
| b[-9] = 2 |
| assert b[0] == 2 |
| |
| assert isinstance(b, cls) |
| |
| with assertRaises(TypeError): |
| b.append("x") |
| |
| with assertRaises(TypeError): |
| b.write("foo") |
| |
| with assertRaises(TypeError): |
| b.append(256) |
| |
| with assertRaises(TypeError): |
| b.append(-1) |
| |
| with assertRaises(IndexError): |
| b[9] |
| with assertRaises(IndexError): |
| b[-10] |
| with assertRaises(IndexError): |
| b[9] = 0 |
| with assertRaises(IndexError): |
| b[-10] = 0 |
| |
| with assertRaises(TypeError): |
| b[0] = -1 |
| with assertRaises(TypeError): |
| b[0] = 256 |
| |
| def test_string_writer_basics() -> None: |
| w = StringWriter() |
| assert w.getvalue() == "" |
| |
| def test_string_writer_repr() -> None: |
| # Kind 1 (ASCII) |
| w = StringWriter() |
| assert repr(w) == "StringWriter('')" |
| w.append(ord('h')) |
| w.append(ord('i')) |
| assert repr(w) == "StringWriter('hi')" |
| |
| # Kind 2 (UCS-2) |
| w2 = StringWriter() |
| w2.append(0x100) |
| w2.append(0x200) |
| assert repr(w2) == "StringWriter('" + chr(0x100) + chr(0x200) + "')" |
| |
| # Kind 4 (UCS-4) |
| w3 = StringWriter() |
| w3.append(0x10000) |
| expected = "StringWriter('" + chr(0x10000) + "')" |
| assert repr(w3) == expected |
| |
| def test_string_writer_repr_escaping() -> None: |
| # Kind 1: Test escaping of newline, nul, tab, backslash |
| w = StringWriter() |
| w.append(ord('a')) |
| w.append(ord('\n')) |
| w.append(0) |
| w.append(ord('\t')) |
| w.append(ord('\\')) |
| assert repr(w) == "StringWriter('a\\n\\x00\\t\\\\')" |
| |
| # Kind 2: escaping with UCS-2 |
| w2 = StringWriter() |
| w2.append(0x100) |
| w2.append(ord('\n')) |
| assert repr(w2) == "StringWriter('" + chr(0x100) + "\\n')" |
| |
| # Kind 4: escaping with UCS-4 |
| w3 = StringWriter() |
| w3.append(0x10000) |
| w3.append(0) |
| assert repr(w3) == "StringWriter('" + chr(0x10000) + "\\x00')" |
| |
| def test_string_writer_len() -> None: |
| # Kind 1 (ASCII) |
| w = StringWriter() |
| assert len(w) == 0 |
| w.append(ord('a')) |
| assert len(w) == 1 |
| w.append(ord('b')) |
| w.append(ord('c')) |
| assert len(w) == 3 |
| |
| # Kind 2 (UCS-2) |
| w2 = StringWriter() |
| w2.append(0x100) |
| assert len(w2) == 1 |
| for i in range(10): |
| w2.append(0x200 + i) |
| assert len(w2) == 11 |
| |
| # Kind 4 (UCS-4) |
| w3 = StringWriter() |
| w3.append(0x10000) |
| assert len(w3) == 1 |
| w3.append(0x10001) |
| w3.append(0x10002) |
| assert len(w3) == 3 |
| |
| # Test len after growing buffer |
| w4 = StringWriter() |
| for i in range(500): |
| w4.append(ord('X')) |
| assert len(w4) == 500 |
| |
| def test_string_writer_get_item() -> None: |
| # Kind 1 (ASCII) |
| w = StringWriter() |
| w.append(ord('f')) |
| w.append(ord('o')) |
| w.append(ord('o')) |
| assert w[0 + int()] == ord('f') |
| assert w[1 + int()] == ord('o') |
| assert w[2 + int()] == ord('o') |
| assert w[-1 + int()] == ord('o') |
| assert w[-2 + int()] == ord('o') |
| assert w[-3 + int()] == ord('f') |
| |
| with assertRaises(IndexError): |
| w[3 + int()] |
| with assertRaises(IndexError): |
| w[-4 + int()] |
| with assertRaises(IndexError): |
| w[1 << 50] |
| with assertRaises(IndexError): |
| w[-(1 << 50)] |
| |
| # Kind 2 (UCS-2) |
| w2 = StringWriter() |
| w2.append(0x100) |
| w2.append(0x200) |
| w2.append(0x300) |
| assert w2[0 + int()] == 0x100 |
| assert w2[1 + int()] == 0x200 |
| assert w2[2 + int()] == 0x300 |
| assert w2[-1 + int()] == 0x300 |
| assert w2[-2 + int()] == 0x200 |
| assert w2[-3 + int()] == 0x100 |
| |
| with assertRaises(IndexError): |
| w2[3 + int()] |
| with assertRaises(IndexError): |
| w2[-4 + int()] |
| |
| # Kind 4 (UCS-4) |
| w3 = StringWriter() |
| w3.append(0x10000) |
| w3.append(0x10001) |
| w3.append(0x10002) |
| assert w3[0 + int()] == 0x10000 |
| assert w3[1 + int()] == 0x10001 |
| assert w3[2 + int()] == 0x10002 |
| assert w3[-1 + int()] == 0x10002 |
| assert w3[-2 + int()] == 0x10001 |
| assert w3[-3 + int()] == 0x10000 |
| |
| with assertRaises(IndexError): |
| w3[3 + int()] |
| with assertRaises(IndexError): |
| w3[-4 + int()] |
| |
| # Test get_item after buffer growth |
| w4 = StringWriter() |
| for i in range(1000): |
| w4.append(ord('a') + (i % 26)) |
| assert w4[0 + int()] == ord('a') |
| assert w4[999 + int()] == ord('a') + (999 % 26) |
| assert w4[500 + int()] == ord('a') + (500 % 26) |
| assert w4[-1 + int()] == ord('a') + (999 % 26) |
| assert w4[-1000 + int()] == ord('a') |
| |
| def test_string_writer_append() -> None: |
| w = StringWriter() |
| w.append(ord('a')) |
| assert w.getvalue() == "a" |
| w.append(0xff) |
| assert w.getvalue() == "a\xff" |
| |
| # Switch kind 1->2 |
| w.append(0x100) |
| assert w.getvalue() == "a\xff\u0100", w.getvalue() |
| w.append(0xffff) |
| assert w.getvalue() == "a\xff\u0100\uffff" |
| |
| # Switch kind 2->4 |
| w.append(0x10000) |
| assert w.getvalue() == "a\xff\u0100\uffff" + chr(0x10000) |
| |
| # Maximum valid Unicode code point (0x10FFFF = 1114111) |
| w2 = StringWriter() |
| w2.append(0x10FFFF) |
| assert w2.getvalue() == chr(0x10FFFF) |
| |
| # Invalid code points |
| w3 = StringWriter() |
| with assertRaises(ValueError, "code point 1114112 is outside valid Unicode range (0-1114111)"): |
| w3.append(0x110000) |
| |
| w4 = StringWriter() |
| with assertRaises(ValueError, "code point -1 is outside valid Unicode range (0-1114111)"): |
| w4.append(-1) |
| |
| w5 = StringWriter() |
| with assertRaises(ValueError, "code point 2097152 is outside valid Unicode range (0-1114111)"): |
| w5.append(0x200000) |
| |
| def test_string_writer_write() -> None: |
| # Kind 1: Write ASCII strings |
| w = StringWriter() |
| w.write("hello") |
| assert w.getvalue() == "hello" |
| w.write(" world") |
| assert w.getvalue() == "hello world" |
| |
| # Write empty string |
| w.write("") |
| assert w.getvalue() == "hello world" |
| |
| # Kind 1 -> Kind 2: Write string with UCS-2 characters |
| w2 = StringWriter() |
| w2.write("abc") |
| assert w2.getvalue() == "abc" |
| w2.write(chr(0x100) + chr(0x200)) |
| assert w2.getvalue() == "abc" + chr(0x100) + chr(0x200) |
| w2.write("xyz") |
| assert w2.getvalue() == "abc" + chr(0x100) + chr(0x200) + "xyz" |
| |
| # Kind 2: Write all UCS-2 |
| w3 = StringWriter() |
| w3.append(0x100) |
| w3.write(chr(0x200) + chr(0x300)) |
| assert w3.getvalue() == chr(0x100) + chr(0x200) + chr(0x300) |
| |
| # Kind 2 -> Kind 4: Write string with UCS-4 characters |
| w4 = StringWriter() |
| w4.write(chr(0x100)) |
| w4.write(chr(0x10000)) |
| assert w4.getvalue() == chr(0x100) + chr(0x10000) |
| |
| # Kind 4: Write mixed |
| w5 = StringWriter() |
| w5.append(0x10000) |
| w5.write("abc") |
| w5.write(chr(0x200)) |
| w5.write(chr(0x10001)) |
| assert w5.getvalue() == chr(0x10000) + "abc" + chr(0x200) + chr(0x10001) |
| |
| # Test with longer strings to trigger buffer growth |
| w6 = StringWriter() |
| for _ in range(100): |
| w6.write("hello") |
| assert w6.getvalue() == "hello" * 100 |
| assert len(w6) == 500 |
| |
| def test_string_writer_append_grow_same_kind() -> None: |
| # Test growing buffer while staying in kind 1 (ASCII) |
| w = StringWriter() |
| # Append enough ASCII characters to grow beyond embedded buffer |
| for i in range(1000): |
| w.append(ord('a') + (i % 26)) |
| assert len(w) == i + 1 |
| result = w.getvalue() |
| assert len(result) == 1000 |
| for i in range(1000): |
| assert result[i] == chr(ord('a') + (i % 26)) |
| |
| # Test growing buffer while staying in kind 2 |
| w2 = StringWriter() |
| w2.append(0x100) # Switch to kind 2 |
| for i in range(1000): |
| w2.append(0x100 + (i % 100)) |
| assert len(w2) == i + 2 |
| result2 = w2.getvalue() |
| assert len(result2) == 1001 |
| assert result2[0] == chr(0x100) |
| for i in range(1000): |
| assert result2[i + 1] == chr(0x100 + (i % 100)) |
| |
| # Test growing buffer while staying in kind 4 |
| w3 = StringWriter() |
| w3.append(0x10000) # Switch to kind 4 |
| for i in range(500): |
| w3.append(0x10000 + (i % 100)) |
| assert len(w3) == i + 2 |
| result3 = w3.getvalue() |
| assert len(result3) == 501 |
| assert result3[0] == chr(0x10000) |
| for i in range(500): |
| assert result3[i + 1] == chr(0x10000 + (i % 100)) |
| |
| def test_string_writer_append_grow_and_switch_kind() -> None: |
| # Test growing buffer AND switching from kind 1 to kind 2 |
| w = StringWriter() |
| # Fill with ASCII to grow buffer |
| for i in range(500): |
| w.append(ord('A')) |
| assert len(w) == 500 |
| # Now append non-ASCII that requires kind 2, triggering both grow and kind switch |
| for i in range(500): |
| w.append(0x100 + i) |
| result = w.getvalue() |
| assert len(result) == 1000 |
| for i in range(500): |
| assert result[i] == 'A' |
| for i in range(500): |
| assert result[500 + i] == chr(0x100 + i) |
| |
| # Test growing buffer AND switching from kind 2 to kind 4 |
| w2 = StringWriter() |
| w2.append(0x100) # Switch to kind 2 |
| # Fill with kind 2 characters to grow buffer |
| for i in range(300): |
| w2.append(0x200 + (i % 100)) |
| assert len(w2) == 301 |
| # Now append characters that require kind 4, triggering both grow and kind switch |
| for i in range(300): |
| w2.append(0x10000 + i) |
| result2 = w2.getvalue() |
| assert len(result2) == 601 |
| assert result2[0] == chr(0x100) |
| for i in range(300): |
| assert result2[1 + i] == chr(0x200 + (i % 100)) |
| for i in range(300): |
| assert result2[301 + i] == chr(0x10000 + i) |
| |
| # Test switching kind 1->4 with buffer growth |
| w3 = StringWriter() |
| for i in range(300): |
| w3.append(ord('X')) |
| # Jump directly to kind 4 |
| w3.append(0x10000) |
| result3 = w3.getvalue() |
| assert len(result3) == 301 |
| for i in range(300): |
| assert result3[i] == 'X' |
| assert result3[300] == chr(0x10000) |
| |
| def test_string_writer_wrapper_functions() -> None: |
| cls: Any = StringWriter |
| s: Any = cls() |
| assert repr(s) == "StringWriter('')" |
| assert len(s) == 0 |
| s.append(ord('a')) |
| s.append(0x100) |
| s.append(0x10000) |
| s.write("foo") |
| assert s.getvalue() == "a" + chr(0x100) + chr(0x10000) + "foo" |
| assert len(s) == 6 |
| |
| assert s[0] == ord('a') |
| assert s[1] == 0x100 |
| assert s[2] == 0x10000 |
| assert s[-1] == ord('o') |
| assert s[-6] == ord('a') |
| |
| assert isinstance(s, cls) |
| |
| with assertRaises(TypeError): |
| s.append("x") |
| |
| with assertRaises(TypeError): |
| s.write(b"foo") |
| |
| with assertRaises(ValueError): |
| s.append(0x110000) |
| |
| with assertRaises(ValueError): |
| s.append(-1) |
| |
| with assertRaises(IndexError): |
| s[6] |
| with assertRaises(IndexError): |
| s[-7] |
| |
| def test_explicit_init_resets_via_any() -> None: |
| # Calling __init__ explicitly on an already-initialized writer resets it. |
| # Any previously-allocated heap buffer must be freed to avoid leaks. |
| bw: Any = BytesWriter() |
| bw.write(b"hello") |
| bw.__init__() |
| assert bw.getvalue() == b"" |
| bw.write(b"world") |
| assert bw.getvalue() == b"world" |
| |
| # Also trigger a heap-allocated buffer before the explicit __init__ call, |
| # to exercise the path that would otherwise leak memory. |
| bw2: Any = BytesWriter() |
| bw2.write(b"x" * 10000) |
| bw2.__init__() |
| assert bw2.getvalue() == b"" |
| bw2.write(b"short") |
| assert bw2.getvalue() == b"short" |
| |
| sw: Any = StringWriter() |
| sw.write("hello") |
| sw.__init__() |
| assert sw.getvalue() == "" |
| sw.write("world") |
| assert sw.getvalue() == "world" |
| |
| sw2: Any = StringWriter() |
| sw2.write("y" * 10000) |
| sw2.__init__() |
| assert sw2.getvalue() == "" |
| sw2.write("short") |
| assert sw2.getvalue() == "short" |
| |
| def test_new_without_init_is_usable() -> None: |
| # BytesWriter.__new__(BytesWriter) must return a fully initialized object |
| # that does not require an explicit __init__ call to be usable. |
| bw: Any = BytesWriter.__new__(BytesWriter) |
| assert bw.getvalue() == b"" |
| bw.write(b"hello") |
| assert bw.getvalue() == b"hello" |
| |
| sw: Any = StringWriter.__new__(StringWriter) |
| assert sw.getvalue() == "" |
| sw.write("hello") |
| assert sw.getvalue() == "hello" |