blob: 909766d5c8e7423dad36afd86d391313c1cc6102 [file] [edit]
[case testLibrtStrings_librt]
from typing import Any
import base64
import binascii
import random
import struct
from librt.strings import BytesWriter, StringWriter, write_i16_le, write_i16_be, read_i16_le, read_i16_be, write_i32_le, write_i32_be, read_i32_le, read_i32_be, write_i64_le, write_i64_be, read_i64_le, read_i64_be, write_f64_le, read_f64_le, write_f64_be, read_f64_be, write_f32_le, write_f32_be, read_f32_le, read_f32_be
from testutil import assertRaises
# Test values for i16 write/read operations, organized by byte size
I16_TEST_VALUES: list[int] = [
# 1-byte values (0 to 127, and -128 to -1 in terms of low byte pattern)
0, 1, 127, -1, -113, -128, # -113 is mypyc overlapping error value
# 2-byte values (128 to 32767, -32768 to -129)
128, 255, 256, 0x1234, 32767, -129, -256, -32768,
]
# Test values for i32 write/read operations: i16 test values plus 3-4 byte values
I32_TEST_VALUES: list[int] = I16_TEST_VALUES + [
# 3-byte values (32768 to 8388607, -8388608 to -32769)
32768, 65535, 65536, 8388607, -32769, -65536, -8388608,
# 4-byte values (8388608 to 2147483647, -2147483648 to -8388609)
8388608, 16777215, 16777216, 2147483647, -8388609, -16777216, -2147483648,
]
# Test values for i64 write/read operations: i32 test values plus 5-8 byte values
I64_TEST_VALUES: list[int] = I32_TEST_VALUES + [
# 5-8 byte values (beyond i32 range)
2147483648, # 2^31, just above i32 max
4294967295, # 2^32 - 1
4294967296, # 2^32
1099511627775, # 2^40 - 1
281474976710655, # 2^48 - 1
72057594037927935, # 2^56 - 1
9223372036854775807, # max i64
-2147483649, # just below i32 min
-4294967296, # -2^32
-9223372036854775808, # min i64
]
# Test values for f64 write/read operations
F64_TEST_VALUES: list[float] = [
0.0, -0.0, 1.0, -1.0,
-113.0, # mypyc overlapping error value
0.5, -0.5, 0.1, -0.1,
1.5, 255.0, 256.0,
1e10, -1e10, 1e100, -1e100,
1e-10, 1e-100,
1.7976931348623157e+308, # max float
-1.7976931348623157e+308, # min float
2.2250738585072014e-308, # min positive normal
5e-324, # min positive subnormal
float('inf'), float('-inf'),
float('nan'),
]
# Test values for f32 write/read operations (values that fit in 32-bit float)
F32_TEST_VALUES: list[float] = [
0.0, -0.0, 1.0, -1.0,
-113.0, # mypyc overlapping error value
0.5, -0.5, 0.1, -0.1,
1.5, 255.0, 256.0,
1e10, -1e10,
1e-10,
3.4028235e+38, # max float32
-3.4028235e+38, # min float32
1.1754944e-38, # min positive normal float32
float('inf'), float('-inf'),
float('nan'),
]
def test_bytes_writer_basics() -> None:
w = BytesWriter()
assert w.getvalue() == b""
assert len(w) == 0
assert repr(w) == "BytesWriter(b'')"
w = BytesWriter()
w.append(ord('a'))
w.write(b'bc')
assert w.getvalue() == b"abc"
assert repr(w) == "BytesWriter(b'abc')"
def test_bytes_writer_get_item() -> None:
w = BytesWriter()
w.write(b"foobar")
assert w[0] == ord("f")
assert w[5] == ord("r")
assert w[-1] == ord("r")
assert w[-2] == ord("a")
assert w[-6] == ord("f")
with assertRaises(IndexError):
w[6]
with assertRaises(IndexError):
w[-7]
with assertRaises(IndexError):
w[1 << 50]
with assertRaises(IndexError):
w[-(1 << 50)]
def test_bytes_writer_set_item() -> None:
w = BytesWriter()
w.write(b"foobar")
w[0] = 255
assert w[0] == 255
w[5] = 0
assert w[5] == 0
assert w.getvalue() == b"\xffooba\x00"
with assertRaises(IndexError):
w[6] = 0
with assertRaises(IndexError):
w[-7] = 0
with assertRaises(IndexError):
w[1 << 50] = 0
with assertRaises(IndexError):
w[-(1 << 50)] = 0
with assertRaises(ValueError):
w[0] = int() - 1
with assertRaises(ValueError):
w[0] = int() + 256
# Grow BytesWriter
w.write(b"xy" * 512)
w[1024 + 5] = 66
assert w[1024 + 5] == 66
assert w[0] == 255
def test_bytes_writer_append_grow() -> None:
w = BytesWriter()
for i in range(16384):
w.append((i ^ (i >> 8)) & 255)
assert len(w) == i + 1
b = w.getvalue()
for i in range(16384):
assert b[i] == (i ^ (i >> 8)) & 255
def test_bytes_writer_write_grow() -> None:
w = BytesWriter()
for i in range(16384):
w.write(bytes([(i ^ (i >> 8)) & 255]))
b = w.getvalue()
for i in range(16384):
assert b[i] == (i ^ (i >> 8)) & 255
assert b[i] == w[i]
w = BytesWriter()
a = []
for i in range(16384):
w.write(bytes())
if i & 1 == 0:
segment = b"foobarz"
else:
segment = b"\x7f\x00ab!"
w.write(segment)
a.append(segment)
assert w.getvalue() == b"".join(a)
def test_bytes_writer_truncate() -> None:
b = BytesWriter()
b.write(b"foobar")
b.truncate(6) # No-op
assert len(b) == 6
assert b.getvalue() == b"foobar"
b.truncate(5)
assert len(b) == 5
assert b.getvalue() == b"fooba"
b.truncate(0)
assert len(b) == 0
assert b.getvalue() == b""
with assertRaises(ValueError):
b.truncate(1)
b = BytesWriter()
b.write(b"foobar")
with assertRaises(ValueError):
b.truncate(-1)
def test_bytes_writer_write_i16_le() -> None:
# Test various i16 values from 1-byte to 2-byte range
w = BytesWriter()
for v in I16_TEST_VALUES:
write_i16_le(w, v)
assert w.getvalue() == b"".join(struct.pack("<h", v) for v in I16_TEST_VALUES)
# Test mixing with other operations and buffer growth
w = BytesWriter()
w.append(0xFF)
for i in range(1000):
write_i16_le(w, i)
w.append(0xEE)
result = w.getvalue()
assert len(result) == 2002
assert result[0] == 0xFF
assert result[-1] == 0xEE
# Check a few values in the middle
assert result[1:3] == b"\x00\x00" # i=0
assert result[3:5] == b"\x01\x00" # i=1
assert result[1999:2001] == b"\xe7\x03" # i=999
def test_write_i16_via_any() -> None:
# Test write_i16_le/be via Any to ensure C extension wrapper works
# (tests fallback path when not using mypyc primitives)
for write_func, fmt in zip((write_i16_le, write_i16_be), ("<h", ">h")):
w: Any = BytesWriter()
# Test 8-bit and 16-bit operations
w.append(0x42)
write_func(w, 0x1234)
w.append(0xFF)
assert w.getvalue() == b"\x42" + struct.pack(fmt, 0x1234) + b"\xFF"
# Test buffer growth
w2: Any = BytesWriter()
for i in range(150):
write_func(w2, i)
result = w2.getvalue()
assert len(result) == 300
assert result[0:2] == struct.pack(fmt, 0) # i=0
assert result[2:4] == struct.pack(fmt, 1) # i=1
assert result[298:300] == struct.pack(fmt, 149) # i=149
# Test values that don't fit in i16
w3: Any = BytesWriter()
with assertRaises(ValueError, "int too large to convert to i16"):
write_func(w3, 32768 + int())
with assertRaises(ValueError, "int too large to convert to i16"):
write_func(w3, -32769 + int())
with assertRaises(ValueError, "int too large to convert to i16"):
write_func(w3, 100000 + int())
def test_bytes_reader_read_i16_le() -> None:
# Test various i16 values from 1-byte to 2-byte range
data = b"".join(struct.pack("<h", v) for v in I16_TEST_VALUES)
for i, v in enumerate(I16_TEST_VALUES):
assert read_i16_le(data, i * 2) == v
# Test round-trip with write_i16_le
w = BytesWriter()
for v in I16_TEST_VALUES:
write_i16_le(w, v)
result = w.getvalue()
for i, v in enumerate(I16_TEST_VALUES):
assert read_i16_le(result, i * 2) == v
# Test unaligned offset
data2 = b"\xFF" + struct.pack("<h", 0x1234) + b"\xFF"
assert read_i16_le(data2, 1) == 0x1234
# Test barely out of bounds (only 1 byte available, need 2)
with assertRaises(IndexError):
read_i16_le(data, len(data) - 1)
def test_bytes_reader_read_i16_be() -> None:
# Test various i16 values from 1-byte to 2-byte range
data = b"".join(struct.pack(">h", v) for v in I16_TEST_VALUES)
for i, v in enumerate(I16_TEST_VALUES):
assert read_i16_be(data, i * 2) == v
# Test round-trip with write_i16_be
w = BytesWriter()
for v in I16_TEST_VALUES:
write_i16_be(w, v)
result = w.getvalue()
for i, v in enumerate(I16_TEST_VALUES):
assert read_i16_be(result, i * 2) == v
# Test unaligned offset
data2 = b"\xFF" + struct.pack(">h", 0x1234) + b"\xFF"
assert read_i16_be(data2, 1) == 0x1234
# Test barely out of bounds (only 1 byte available, need 2)
with assertRaises(IndexError):
read_i16_be(data, len(data) - 1)
def test_read_i16_via_any() -> None:
# Test read_i16_le/be via Any to ensure C extension wrapper works
for read_func, fmt in zip((read_i16_le, read_i16_be), ("<h", ">h")):
data: Any = struct.pack(fmt, 0x1234) + struct.pack(fmt, -1) + struct.pack(fmt, 0)
assert read_func(data, 0) == 0x1234
assert read_func(data, 2) == -1
assert read_func(data, 4) == 0
# Test error cases
# Index out of range
with assertRaises(IndexError, "index 10 out of range for bytes of length 6"):
read_func(data, 10 + int())
with assertRaises(IndexError, "index 5 out of range for bytes of length 6"):
read_func(data, 5 + int()) # Not enough bytes for i16
# Negative index
with assertRaises(ValueError, "index must be non-negative"):
read_func(data, -1 + int())
# Wrong type for bytes argument
with assertRaises(TypeError):
bad: Any = "not bytes"
read_func(bad, 0 + int())
with assertRaises(TypeError):
bad2: Any = bytearray(b"\x00\x00")
read_func(bad2, 0 + int())
def test_bytes_writer_write_i16_be() -> None:
# Test various i16 values from 1-byte to 2-byte range
w = BytesWriter()
for v in I16_TEST_VALUES:
write_i16_be(w, v)
assert w.getvalue() == b"".join(struct.pack(">h", v) for v in I16_TEST_VALUES)
# Test mixing with other operations and buffer growth
w = BytesWriter()
w.append(0xFF)
for i in range(1000):
write_i16_be(w, i)
w.append(0xEE)
result = w.getvalue()
assert len(result) == 2002
assert result[0] == 0xFF
assert result[-1] == 0xEE
# Check a few values in the middle
assert result[1:3] == b"\x00\x00" # i=0
assert result[3:5] == b"\x00\x01" # i=1
assert result[1999:2001] == b"\x03\xe7" # i=999
def test_bytes_writer_write_i32_le() -> None:
# Test various i32 values from 1-byte to 4-byte range
w = BytesWriter()
for v in I32_TEST_VALUES:
write_i32_le(w, v)
assert w.getvalue() == b"".join(struct.pack("<i", v) for v in I32_TEST_VALUES)
# Test mixing with other operations and buffer growth
w = BytesWriter()
w.append(0xFF)
for i in range(500):
write_i32_le(w, i * 1000)
w.append(0xEE)
result = w.getvalue()
assert len(result) == 2002
assert result[0] == 0xFF
assert result[-1] == 0xEE
# Check a few values in the middle
assert result[1:5] == struct.pack("<i", 0)
assert result[5:9] == struct.pack("<i", 1000)
assert result[1997:2001] == struct.pack("<i", 499000)
def test_bytes_writer_write_i32_be() -> None:
# Test various i32 values from 1-byte to 4-byte range
w = BytesWriter()
for v in I32_TEST_VALUES:
write_i32_be(w, v)
assert w.getvalue() == b"".join(struct.pack(">i", v) for v in I32_TEST_VALUES)
# Test mixing with other operations and buffer growth
w = BytesWriter()
w.append(0xFF)
for i in range(500):
write_i32_be(w, i * 1000)
w.append(0xEE)
result = w.getvalue()
assert len(result) == 2002
assert result[0] == 0xFF
assert result[-1] == 0xEE
# Check a few values in the middle
assert result[1:5] == struct.pack(">i", 0)
assert result[5:9] == struct.pack(">i", 1000)
assert result[1997:2001] == struct.pack(">i", 499000)
def test_write_i32_via_any() -> None:
# Test write_i32_le/be via Any to ensure C extension wrapper works
# (tests fallback path when not using mypyc primitives)
for write_func, fmt in zip((write_i32_le, write_i32_be), ("<i", ">i")):
w: Any = BytesWriter()
# Test 8-bit and 32-bit operations
w.append(0x42)
write_func(w, 0x12345678)
w.append(0xFF)
assert w.getvalue() == b"\x42" + struct.pack(fmt, 0x12345678) + b"\xFF"
# Test buffer growth
w2: Any = BytesWriter()
for i in range(100):
write_func(w2, i * 10000)
result = w2.getvalue()
assert len(result) == 400
assert result[0:4] == struct.pack(fmt, 0)
assert result[4:8] == struct.pack(fmt, 10000)
assert result[396:400] == struct.pack(fmt, 990000)
# Test values that don't fit in i32
w3: Any = BytesWriter()
with assertRaises(ValueError, "int too large to convert to i32"):
write_func(w3, 2147483648 + int())
with assertRaises(ValueError, "int too large to convert to i32"):
write_func(w3, -2147483649 + int())
with assertRaises(ValueError, "int too large to convert to i32"):
write_func(w3, 10000000000 + int())
def test_bytes_reader_read_i32_le() -> None:
# Test various i32 values from 1-byte to 4-byte range
data = b"".join(struct.pack("<i", v) for v in I32_TEST_VALUES)
for i, v in enumerate(I32_TEST_VALUES):
assert read_i32_le(data, i * 4) == v
# Test round-trip with write_i32_le
w = BytesWriter()
for v in I32_TEST_VALUES:
write_i32_le(w, v)
result = w.getvalue()
for i, v in enumerate(I32_TEST_VALUES):
assert read_i32_le(result, i * 4) == v
# Test unaligned offset
data2 = b"\xFF" + struct.pack("<i", 0x12345678) + b"\xFF"
assert read_i32_le(data2, 1) == 0x12345678
# Test barely out of bounds (only 3 bytes available, need 4)
with assertRaises(IndexError):
read_i32_le(data, len(data) - 3)
def test_bytes_reader_read_i32_be() -> None:
# Test various i32 values from 1-byte to 4-byte range
data = b"".join(struct.pack(">i", v) for v in I32_TEST_VALUES)
for i, v in enumerate(I32_TEST_VALUES):
assert read_i32_be(data, i * 4) == v
# Test round-trip with write_i32_be
w = BytesWriter()
for v in I32_TEST_VALUES:
write_i32_be(w, v)
result = w.getvalue()
for i, v in enumerate(I32_TEST_VALUES):
assert read_i32_be(result, i * 4) == v
# Test unaligned offset
data2 = b"\xFF" + struct.pack(">i", 0x12345678) + b"\xFF"
assert read_i32_be(data2, 1) == 0x12345678
# Test barely out of bounds (only 3 bytes available, need 4)
with assertRaises(IndexError):
read_i32_be(data, len(data) - 3)
def test_read_i32_via_any() -> None:
# Test read_i32_le/be via Any to ensure C extension wrapper works
for read_func, fmt in zip((read_i32_le, read_i32_be), ("<i", ">i")):
data: Any = struct.pack(fmt, 0x12345678) + struct.pack(fmt, -1) + struct.pack(fmt, 0)
assert read_func(data, 0) == 0x12345678
assert read_func(data, 4) == -1
assert read_func(data, 8) == 0
# Test error cases
# Index out of range
with assertRaises(IndexError, "index 20 out of range for bytes of length 12"):
read_func(data, 20 + int())
with assertRaises(IndexError, "index 9 out of range for bytes of length 12"):
read_func(data, 9 + int()) # Not enough bytes for i32
# Negative index
with assertRaises(ValueError, "index must be non-negative"):
read_func(data, -1 + int())
# Wrong type for bytes argument
with assertRaises(TypeError):
bad: Any = "not bytes"
read_func(bad, 0 + int())
with assertRaises(TypeError):
bad2: Any = bytearray(b"\x00\x00\x00\x00")
read_func(bad2, 0 + int())
def test_bytes_writer_write_i64_le() -> None:
# Test all i64 values (includes all i32 values plus 5-8 byte values)
w = BytesWriter()
for v in I64_TEST_VALUES:
write_i64_le(w, v)
assert w.getvalue() == b"".join(struct.pack("<q", v) for v in I64_TEST_VALUES)
# Test mixing with other operations and buffer growth
w = BytesWriter()
w.append(0xFF)
for i in range(200):
write_i64_le(w, i * 1000000000)
w.append(0xEE)
result = w.getvalue()
assert len(result) == 1602
assert result[0] == 0xFF
assert result[-1] == 0xEE
assert result[1:9] == struct.pack("<q", 0)
assert result[9:17] == struct.pack("<q", 1000000000)
assert result[1593:1601] == struct.pack("<q", 199000000000)
def test_bytes_writer_write_i64_be() -> None:
# Test all i64 values (includes all i32 values plus 5-8 byte values)
w = BytesWriter()
for v in I64_TEST_VALUES:
write_i64_be(w, v)
assert w.getvalue() == b"".join(struct.pack(">q", v) for v in I64_TEST_VALUES)
# Test mixing with other operations and buffer growth
w = BytesWriter()
w.append(0xFF)
for i in range(200):
write_i64_be(w, i * 1000000000)
w.append(0xEE)
result = w.getvalue()
assert len(result) == 1602
assert result[0] == 0xFF
assert result[-1] == 0xEE
assert result[1:9] == struct.pack(">q", 0)
assert result[9:17] == struct.pack(">q", 1000000000)
assert result[1593:1601] == struct.pack(">q", 199000000000)
def test_write_i64_via_any() -> None:
# Test write_i64_le/be via Any to ensure C extension wrapper works
for write_func, fmt in zip((write_i64_le, write_i64_be), ("<q", ">q")):
w: Any = BytesWriter()
w.append(0x42)
write_func(w, 0x123456789ABCDEF0)
w.append(0xFF)
assert w.getvalue() == b"\x42" + struct.pack(fmt, 0x123456789ABCDEF0) + b"\xFF"
# Test buffer growth
w2: Any = BytesWriter()
for i in range(50):
write_func(w2, i * 10000000000)
result = w2.getvalue()
assert len(result) == 400
assert result[0:8] == struct.pack(fmt, 0)
assert result[8:16] == struct.pack(fmt, 10000000000)
assert result[392:400] == struct.pack(fmt, 490000000000)
def test_bytes_reader_read_i64_le() -> None:
# Test all i64 values (includes all i32 values plus 5-8 byte values)
data = b"".join(struct.pack("<q", v) for v in I64_TEST_VALUES)
for i, v in enumerate(I64_TEST_VALUES):
assert read_i64_le(data, i * 8) == v
# Test round-trip with write_i64_le
w = BytesWriter()
for v in I64_TEST_VALUES:
write_i64_le(w, v)
result = w.getvalue()
for i, v in enumerate(I64_TEST_VALUES):
assert read_i64_le(result, i * 8) == v
# Test unaligned offset
data2 = b"\xFF" + struct.pack("<q", 0x123456789ABCDEF0) + b"\xFF"
assert read_i64_le(data2, 1) == 0x123456789ABCDEF0
# Test barely out of bounds (only 7 bytes available, need 8)
with assertRaises(IndexError):
read_i64_le(data, len(data) - 7)
def test_bytes_reader_read_i64_be() -> None:
# Test all i64 values (includes all i32 values plus 5-8 byte values)
data = b"".join(struct.pack(">q", v) for v in I64_TEST_VALUES)
for i, v in enumerate(I64_TEST_VALUES):
assert read_i64_be(data, i * 8) == v
# Test round-trip with write_i64_be
w = BytesWriter()
for v in I64_TEST_VALUES:
write_i64_be(w, v)
result = w.getvalue()
for i, v in enumerate(I64_TEST_VALUES):
assert read_i64_be(result, i * 8) == v
# Test unaligned offset
data2 = b"\xFF" + struct.pack(">q", 0x123456789ABCDEF0) + b"\xFF"
assert read_i64_be(data2, 1) == 0x123456789ABCDEF0
# Test barely out of bounds (only 7 bytes available, need 8)
with assertRaises(IndexError):
read_i64_be(data, len(data) - 7)
def test_read_i64_via_any() -> None:
# Test read_i64_le/be via Any to ensure C extension wrapper works
for read_func, fmt in zip((read_i64_le, read_i64_be), ("<q", ">q")):
data: Any = struct.pack(fmt, 0x123456789ABCDEF0) + struct.pack(fmt, -1) + struct.pack(fmt, 0)
assert read_func(data, 0) == 0x123456789ABCDEF0
assert read_func(data, 8) == -1
assert read_func(data, 16) == 0
# Test error cases
# Index out of range
with assertRaises(IndexError, "index 30 out of range for bytes of length 24"):
read_func(data, 30 + int())
with assertRaises(IndexError, "index 17 out of range for bytes of length 24"):
read_func(data, 17 + int()) # Not enough bytes for i64
# Negative index
with assertRaises(ValueError, "index must be non-negative"):
read_func(data, -1 + int())
# Wrong type for bytes argument
with assertRaises(TypeError):
bad: Any = "not bytes"
read_func(bad, 0 + int())
with assertRaises(TypeError):
bad2: Any = bytearray(b"\x00" * 8)
read_func(bad2, 0 + int())
def test_bytes_writer_write_f64_le() -> None:
# Test various f64 values
w = BytesWriter()
for v in F64_TEST_VALUES:
write_f64_le(w, v)
result = w.getvalue()
expected = b"".join(struct.pack("<d", v) for v in F64_TEST_VALUES)
# Compare byte-by-byte (NaN != NaN but bytes should match)
assert result == expected
# Test mixing with other operations and buffer growth
w = BytesWriter()
w.append(0xFF)
for i in range(200):
write_f64_le(w, float(i) * 1.5)
w.append(0xEE)
result = w.getvalue()
assert len(result) == 1602
assert result[0] == 0xFF
assert result[-1] == 0xEE
assert result[1:9] == struct.pack("<d", 0.0)
assert result[9:17] == struct.pack("<d", 1.5)
assert result[1593:1601] == struct.pack("<d", 199.0 * 1.5)
def test_bytes_writer_write_f64_be() -> None:
# Test various f64 values
w = BytesWriter()
for v in F64_TEST_VALUES:
write_f64_be(w, v)
result = w.getvalue()
expected = b"".join(struct.pack(">d", v) for v in F64_TEST_VALUES)
# Compare byte-by-byte (NaN != NaN but bytes should match)
assert result == expected
# Test mixing with other operations and buffer growth
w = BytesWriter()
w.append(0xFF)
for i in range(200):
write_f64_be(w, float(i) * 1.5)
w.append(0xEE)
result = w.getvalue()
assert len(result) == 1602
assert result[0] == 0xFF
assert result[-1] == 0xEE
assert result[1:9] == struct.pack(">d", 0.0)
assert result[9:17] == struct.pack(">d", 1.5)
assert result[1593:1601] == struct.pack(">d", 199.0 * 1.5)
def test_bytes_reader_read_f64_le() -> None:
import math
# Test various f64 values
data = b"".join(struct.pack("<d", v) for v in F64_TEST_VALUES)
for i, v in enumerate(F64_TEST_VALUES):
result = read_f64_le(data, i * 8)
if math.isnan(v):
assert math.isnan(result)
else:
assert result == v
# Test round-trip with write_f64_le
w = BytesWriter()
for v in F64_TEST_VALUES:
write_f64_le(w, v)
result_bytes = w.getvalue()
for i, v in enumerate(F64_TEST_VALUES):
result = read_f64_le(result_bytes, i * 8)
if math.isnan(v):
assert math.isnan(result)
else:
assert result == v
# Test unaligned offset
data2 = b"\xFF" + struct.pack("<d", 1.5) + b"\xFF"
assert read_f64_le(data2, 1) == 1.5
# Test barely out of bounds (only 7 bytes available, need 8)
with assertRaises(IndexError):
read_f64_le(data, len(data) - 7)
def test_bytes_reader_read_f64_be() -> None:
import math
# Test various f64 values
data = b"".join(struct.pack(">d", v) for v in F64_TEST_VALUES)
for i, v in enumerate(F64_TEST_VALUES):
result = read_f64_be(data, i * 8)
if math.isnan(v):
assert math.isnan(result)
else:
assert result == v
# Test round-trip with write_f64_be
w = BytesWriter()
for v in F64_TEST_VALUES:
write_f64_be(w, v)
result_bytes = w.getvalue()
for i, v in enumerate(F64_TEST_VALUES):
result = read_f64_be(result_bytes, i * 8)
if math.isnan(v):
assert math.isnan(result)
else:
assert result == v
# Test unaligned offset
data2 = b"\xFF" + struct.pack(">d", 1.5) + b"\xFF"
assert read_f64_be(data2, 1) == 1.5
# Test barely out of bounds (only 7 bytes available, need 8)
with assertRaises(IndexError):
read_f64_be(data, len(data) - 7)
def test_read_f64_via_any() -> None:
import math
# Test read_f64_le/be via Any to ensure C extension wrapper works
for read_func, fmt in zip((read_f64_le, read_f64_be), ("<d", ">d")):
data: Any = struct.pack(fmt, 1.5) + struct.pack(fmt, -113.0) + struct.pack(fmt, 0.0)
assert read_func(data, 0) == 1.5
assert read_func(data, 8) == -113.0
assert read_func(data, 16) == 0.0
# Test special values
data2: Any = struct.pack(fmt, float('inf')) + struct.pack(fmt, float('nan'))
assert read_func(data2, 0) == float('inf')
assert math.isnan(read_func(data2, 8))
# Test error cases
# Index out of range
with assertRaises(IndexError, "index 30 out of range for bytes of length 24"):
read_func(data, 30 + int())
with assertRaises(IndexError, "index 17 out of range for bytes of length 24"):
read_func(data, 17 + int()) # Not enough bytes for f64
# Negative index
with assertRaises(ValueError, "index must be non-negative"):
read_func(data, -1 + int())
# Wrong type for bytes argument
with assertRaises(TypeError):
bad: Any = "not bytes"
read_func(bad, 0 + int())
with assertRaises(TypeError):
bad2: Any = bytearray(b"\x00" * 8)
read_func(bad2, 0 + int())
def test_write_f64_via_any() -> None:
import math
# Test write_f64_le/be via Any to ensure C extension wrapper works
for write_func, fmt in zip((write_f64_le, write_f64_be), ("<d", ">d")):
w: Any = BytesWriter()
w.append(0x42)
write_func(w, 1.5)
w.append(0xFF)
assert w.getvalue() == b"\x42" + struct.pack(fmt, 1.5) + b"\xFF"
# Test buffer growth
w2: Any = BytesWriter()
for i in range(50):
write_func(w2, float(i) * 0.25)
result = w2.getvalue()
assert len(result) == 400
assert result[0:8] == struct.pack(fmt, 0.0)
assert result[8:16] == struct.pack(fmt, 0.25)
assert result[392:400] == struct.pack(fmt, 49.0 * 0.25)
# Test special values
w3: Any = BytesWriter()
write_func(w3, float('inf'))
write_func(w3, float('-inf'))
write_func(w3, float('nan'))
result = w3.getvalue()
assert result[0:8] == struct.pack(fmt, float('inf'))
assert result[8:16] == struct.pack(fmt, float('-inf'))
assert math.isnan(struct.unpack(fmt, result[16:24])[0])
# Test wrong type
w4: Any = BytesWriter()
with assertRaises(TypeError):
bad: Any = "not a float" + str()
write_func(w4, bad)
def test_bytes_writer_write_f32_le() -> None:
import struct as s
# Test various f32 values
w = BytesWriter()
for v in F32_TEST_VALUES:
write_f32_le(w, v)
result = w.getvalue()
expected = b"".join(s.pack("<f", v) for v in F32_TEST_VALUES)
# Compare byte-by-byte (NaN != NaN but bytes should match)
assert result == expected
# Test mixing with other operations and buffer growth
w = BytesWriter()
w.append(0xFF)
for i in range(200):
write_f32_le(w, float(i) * 1.5)
w.append(0xEE)
result = w.getvalue()
assert len(result) == 802
assert result[0] == 0xFF
assert result[-1] == 0xEE
assert result[1:5] == s.pack("<f", 0.0)
assert result[5:9] == s.pack("<f", 1.5)
assert result[797:801] == s.pack("<f", 199.0 * 1.5)
def test_bytes_writer_write_f32_be() -> None:
import struct as s
# Test various f32 values
w = BytesWriter()
for v in F32_TEST_VALUES:
write_f32_be(w, v)
result = w.getvalue()
expected = b"".join(s.pack(">f", v) for v in F32_TEST_VALUES)
# Compare byte-by-byte (NaN != NaN but bytes should match)
assert result == expected
# Test mixing with other operations and buffer growth
w = BytesWriter()
w.append(0xFF)
for i in range(200):
write_f32_be(w, float(i) * 1.5)
w.append(0xEE)
result = w.getvalue()
assert len(result) == 802
assert result[0] == 0xFF
assert result[-1] == 0xEE
assert result[1:5] == s.pack(">f", 0.0)
assert result[5:9] == s.pack(">f", 1.5)
assert result[797:801] == s.pack(">f", 199.0 * 1.5)
def test_write_f32_via_any() -> None:
import math
# Test write_f32_le/be via Any to ensure C extension wrapper works
for write_func, fmt in zip((write_f32_le, write_f32_be), ("<f", ">f")):
w: Any = BytesWriter()
w.append(0x42)
write_func(w, 1.5)
w.append(0xFF)
assert w.getvalue() == b"\x42" + struct.pack(fmt, 1.5) + b"\xFF"
# Test buffer growth
w2: Any = BytesWriter()
for i in range(100):
write_func(w2, float(i) * 0.25)
result = w2.getvalue()
assert len(result) == 400
assert result[0:4] == struct.pack(fmt, 0.0)
assert result[4:8] == struct.pack(fmt, 0.25)
assert result[396:400] == struct.pack(fmt, 99.0 * 0.25)
# Test special values
w3: Any = BytesWriter()
write_func(w3, float('inf'))
write_func(w3, float('-inf'))
write_func(w3, float('nan'))
result = w3.getvalue()
assert result[0:4] == struct.pack(fmt, float('inf'))
assert result[4:8] == struct.pack(fmt, float('-inf'))
assert math.isnan(struct.unpack(fmt, result[8:12])[0])
# Test wrong type
w4: Any = BytesWriter()
with assertRaises(TypeError):
bad: Any = "not a float" + str()
write_func(w4, bad)
def test_bytes_reader_read_f32_le() -> None:
import math
import struct as s
# Test various f32 values (compare against f32-rounded expected value,
# since some f64 values like 0.1 lose precision when stored as f32)
data = b"".join(s.pack("<f", v) for v in F32_TEST_VALUES)
for i, v in enumerate(F32_TEST_VALUES):
result = read_f32_le(data, i * 4)
expected = s.unpack("<f", s.pack("<f", v))[0]
if math.isnan(v):
assert math.isnan(result)
else:
assert result == expected
# Test round-trip with write_f32_le
w = BytesWriter()
for v in F32_TEST_VALUES:
write_f32_le(w, v)
result_bytes = w.getvalue()
for i, v in enumerate(F32_TEST_VALUES):
result = read_f32_le(result_bytes, i * 4)
expected = s.unpack("<f", s.pack("<f", v))[0]
if math.isnan(v):
assert math.isnan(result)
else:
assert result == expected
# Test unaligned offset
data2 = b"\xFF" + s.pack("<f", 1.5) + b"\xFF"
assert read_f32_le(data2, 1) == 1.5
# Test barely out of bounds (only 3 bytes available, need 4)
with assertRaises(IndexError):
read_f32_le(data, len(data) - 3)
def test_bytes_reader_read_f32_be() -> None:
import math
import struct as s
# Test various f32 values (compare against f32-rounded expected value,
# since some f64 values like 0.1 lose precision when stored as f32)
data = b"".join(s.pack(">f", v) for v in F32_TEST_VALUES)
for i, v in enumerate(F32_TEST_VALUES):
result = read_f32_be(data, i * 4)
expected = s.unpack(">f", s.pack(">f", v))[0]
if math.isnan(v):
assert math.isnan(result)
else:
assert result == expected
# Test round-trip with write_f32_be
w = BytesWriter()
for v in F32_TEST_VALUES:
write_f32_be(w, v)
result_bytes = w.getvalue()
for i, v in enumerate(F32_TEST_VALUES):
result = read_f32_be(result_bytes, i * 4)
expected = s.unpack(">f", s.pack(">f", v))[0]
if math.isnan(v):
assert math.isnan(result)
else:
assert result == expected
# Test unaligned offset
data2 = b"\xFF" + s.pack(">f", 1.5) + b"\xFF"
assert read_f32_be(data2, 1) == 1.5
# Test barely out of bounds (only 3 bytes available, need 4)
with assertRaises(IndexError):
read_f32_be(data, len(data) - 3)
def test_read_f32_via_any() -> None:
import math
# Test read_f32_le/be via Any to ensure C extension wrapper works
for read_func, fmt in zip((read_f32_le, read_f32_be), ("<f", ">f")):
data: Any = struct.pack(fmt, 1.5) + struct.pack(fmt, -113.0) + struct.pack(fmt, 0.0)
assert read_func(data, 0) == 1.5
assert read_func(data, 4) == -113.0
assert read_func(data, 8) == 0.0
# Test special values
data2: Any = struct.pack(fmt, float('inf')) + struct.pack(fmt, float('nan'))
assert read_func(data2, 0) == float('inf')
assert math.isnan(read_func(data2, 4))
# Test error cases
# Index out of range
with assertRaises(IndexError, "index 20 out of range for bytes of length 12"):
read_func(data, 20 + int())
with assertRaises(IndexError, "index 9 out of range for bytes of length 12"):
read_func(data, 9 + int()) # Not enough bytes for f32
# Negative index
with assertRaises(ValueError, "index must be non-negative"):
read_func(data, -1 + int())
# Wrong type for bytes argument
with assertRaises(TypeError):
bad: Any = "not bytes"
read_func(bad, 0 + int())
with assertRaises(TypeError):
bad2: Any = bytearray(b"\x00" * 4)
read_func(bad2, 0 + int())
def test_write_bytearray() -> None:
w = BytesWriter()
w.write(bytearray(b"foobar"))
w.write(bytearray(b""))
w.write(bytearray(b"\x00\xf8"))
assert w.getvalue() == b"foobar\x00\xf8"
def test_cast_bytes_writer() -> None:
a: Any = BytesWriter()
b: BytesWriter = a
assert b.getvalue() == b""
a2: Any = "x"
with assertRaises(TypeError):
b = a2
def test_bytes_writer_wrapper_functions() -> None:
cls: Any = BytesWriter
b: Any = cls()
assert repr(b) == "BytesWriter(b'')"
assert len(b) == 0
b.append(ord('a'))
b.append(0)
b.append(255)
b.write(b"foo")
b.write(bytearray(b"bar"))
assert b.getvalue() == b"a\x00\xfffoobar"
assert len(b) == 9
assert b[0] == ord('a')
assert b[8] == ord('r')
assert b[-1] == ord('r')
assert b[-9] == ord('a')
b[0] = 215
b[8] = 0
assert b[0] == 215
assert b[8] == 0
b[-1] = 1
assert b[8] == 1
b[-9] = 2
assert b[0] == 2
assert isinstance(b, cls)
with assertRaises(TypeError):
b.append("x")
with assertRaises(TypeError):
b.write("foo")
with assertRaises(TypeError):
b.append(256)
with assertRaises(TypeError):
b.append(-1)
with assertRaises(IndexError):
b[9]
with assertRaises(IndexError):
b[-10]
with assertRaises(IndexError):
b[9] = 0
with assertRaises(IndexError):
b[-10] = 0
with assertRaises(TypeError):
b[0] = -1
with assertRaises(TypeError):
b[0] = 256
def test_string_writer_basics() -> None:
w = StringWriter()
assert w.getvalue() == ""
def test_string_writer_repr() -> None:
# Kind 1 (ASCII)
w = StringWriter()
assert repr(w) == "StringWriter('')"
w.append(ord('h'))
w.append(ord('i'))
assert repr(w) == "StringWriter('hi')"
# Kind 2 (UCS-2)
w2 = StringWriter()
w2.append(0x100)
w2.append(0x200)
assert repr(w2) == "StringWriter('" + chr(0x100) + chr(0x200) + "')"
# Kind 4 (UCS-4)
w3 = StringWriter()
w3.append(0x10000)
expected = "StringWriter('" + chr(0x10000) + "')"
assert repr(w3) == expected
def test_string_writer_repr_escaping() -> None:
# Kind 1: Test escaping of newline, nul, tab, backslash
w = StringWriter()
w.append(ord('a'))
w.append(ord('\n'))
w.append(0)
w.append(ord('\t'))
w.append(ord('\\'))
assert repr(w) == "StringWriter('a\\n\\x00\\t\\\\')"
# Kind 2: escaping with UCS-2
w2 = StringWriter()
w2.append(0x100)
w2.append(ord('\n'))
assert repr(w2) == "StringWriter('" + chr(0x100) + "\\n')"
# Kind 4: escaping with UCS-4
w3 = StringWriter()
w3.append(0x10000)
w3.append(0)
assert repr(w3) == "StringWriter('" + chr(0x10000) + "\\x00')"
def test_string_writer_len() -> None:
# Kind 1 (ASCII)
w = StringWriter()
assert len(w) == 0
w.append(ord('a'))
assert len(w) == 1
w.append(ord('b'))
w.append(ord('c'))
assert len(w) == 3
# Kind 2 (UCS-2)
w2 = StringWriter()
w2.append(0x100)
assert len(w2) == 1
for i in range(10):
w2.append(0x200 + i)
assert len(w2) == 11
# Kind 4 (UCS-4)
w3 = StringWriter()
w3.append(0x10000)
assert len(w3) == 1
w3.append(0x10001)
w3.append(0x10002)
assert len(w3) == 3
# Test len after growing buffer
w4 = StringWriter()
for i in range(500):
w4.append(ord('X'))
assert len(w4) == 500
def test_string_writer_get_item() -> None:
# Kind 1 (ASCII)
w = StringWriter()
w.append(ord('f'))
w.append(ord('o'))
w.append(ord('o'))
assert w[0 + int()] == ord('f')
assert w[1 + int()] == ord('o')
assert w[2 + int()] == ord('o')
assert w[-1 + int()] == ord('o')
assert w[-2 + int()] == ord('o')
assert w[-3 + int()] == ord('f')
with assertRaises(IndexError):
w[3 + int()]
with assertRaises(IndexError):
w[-4 + int()]
with assertRaises(IndexError):
w[1 << 50]
with assertRaises(IndexError):
w[-(1 << 50)]
# Kind 2 (UCS-2)
w2 = StringWriter()
w2.append(0x100)
w2.append(0x200)
w2.append(0x300)
assert w2[0 + int()] == 0x100
assert w2[1 + int()] == 0x200
assert w2[2 + int()] == 0x300
assert w2[-1 + int()] == 0x300
assert w2[-2 + int()] == 0x200
assert w2[-3 + int()] == 0x100
with assertRaises(IndexError):
w2[3 + int()]
with assertRaises(IndexError):
w2[-4 + int()]
# Kind 4 (UCS-4)
w3 = StringWriter()
w3.append(0x10000)
w3.append(0x10001)
w3.append(0x10002)
assert w3[0 + int()] == 0x10000
assert w3[1 + int()] == 0x10001
assert w3[2 + int()] == 0x10002
assert w3[-1 + int()] == 0x10002
assert w3[-2 + int()] == 0x10001
assert w3[-3 + int()] == 0x10000
with assertRaises(IndexError):
w3[3 + int()]
with assertRaises(IndexError):
w3[-4 + int()]
# Test get_item after buffer growth
w4 = StringWriter()
for i in range(1000):
w4.append(ord('a') + (i % 26))
assert w4[0 + int()] == ord('a')
assert w4[999 + int()] == ord('a') + (999 % 26)
assert w4[500 + int()] == ord('a') + (500 % 26)
assert w4[-1 + int()] == ord('a') + (999 % 26)
assert w4[-1000 + int()] == ord('a')
def test_string_writer_append() -> None:
w = StringWriter()
w.append(ord('a'))
assert w.getvalue() == "a"
w.append(0xff)
assert w.getvalue() == "a\xff"
# Switch kind 1->2
w.append(0x100)
assert w.getvalue() == "a\xff\u0100", w.getvalue()
w.append(0xffff)
assert w.getvalue() == "a\xff\u0100\uffff"
# Switch kind 2->4
w.append(0x10000)
assert w.getvalue() == "a\xff\u0100\uffff" + chr(0x10000)
# Maximum valid Unicode code point (0x10FFFF = 1114111)
w2 = StringWriter()
w2.append(0x10FFFF)
assert w2.getvalue() == chr(0x10FFFF)
# Invalid code points
w3 = StringWriter()
with assertRaises(ValueError, "code point 1114112 is outside valid Unicode range (0-1114111)"):
w3.append(0x110000)
w4 = StringWriter()
with assertRaises(ValueError, "code point -1 is outside valid Unicode range (0-1114111)"):
w4.append(-1)
w5 = StringWriter()
with assertRaises(ValueError, "code point 2097152 is outside valid Unicode range (0-1114111)"):
w5.append(0x200000)
def test_string_writer_write() -> None:
# Kind 1: Write ASCII strings
w = StringWriter()
w.write("hello")
assert w.getvalue() == "hello"
w.write(" world")
assert w.getvalue() == "hello world"
# Write empty string
w.write("")
assert w.getvalue() == "hello world"
# Kind 1 -> Kind 2: Write string with UCS-2 characters
w2 = StringWriter()
w2.write("abc")
assert w2.getvalue() == "abc"
w2.write(chr(0x100) + chr(0x200))
assert w2.getvalue() == "abc" + chr(0x100) + chr(0x200)
w2.write("xyz")
assert w2.getvalue() == "abc" + chr(0x100) + chr(0x200) + "xyz"
# Kind 2: Write all UCS-2
w3 = StringWriter()
w3.append(0x100)
w3.write(chr(0x200) + chr(0x300))
assert w3.getvalue() == chr(0x100) + chr(0x200) + chr(0x300)
# Kind 2 -> Kind 4: Write string with UCS-4 characters
w4 = StringWriter()
w4.write(chr(0x100))
w4.write(chr(0x10000))
assert w4.getvalue() == chr(0x100) + chr(0x10000)
# Kind 4: Write mixed
w5 = StringWriter()
w5.append(0x10000)
w5.write("abc")
w5.write(chr(0x200))
w5.write(chr(0x10001))
assert w5.getvalue() == chr(0x10000) + "abc" + chr(0x200) + chr(0x10001)
# Test with longer strings to trigger buffer growth
w6 = StringWriter()
for _ in range(100):
w6.write("hello")
assert w6.getvalue() == "hello" * 100
assert len(w6) == 500
def test_string_writer_append_grow_same_kind() -> None:
# Test growing buffer while staying in kind 1 (ASCII)
w = StringWriter()
# Append enough ASCII characters to grow beyond embedded buffer
for i in range(1000):
w.append(ord('a') + (i % 26))
assert len(w) == i + 1
result = w.getvalue()
assert len(result) == 1000
for i in range(1000):
assert result[i] == chr(ord('a') + (i % 26))
# Test growing buffer while staying in kind 2
w2 = StringWriter()
w2.append(0x100) # Switch to kind 2
for i in range(1000):
w2.append(0x100 + (i % 100))
assert len(w2) == i + 2
result2 = w2.getvalue()
assert len(result2) == 1001
assert result2[0] == chr(0x100)
for i in range(1000):
assert result2[i + 1] == chr(0x100 + (i % 100))
# Test growing buffer while staying in kind 4
w3 = StringWriter()
w3.append(0x10000) # Switch to kind 4
for i in range(500):
w3.append(0x10000 + (i % 100))
assert len(w3) == i + 2
result3 = w3.getvalue()
assert len(result3) == 501
assert result3[0] == chr(0x10000)
for i in range(500):
assert result3[i + 1] == chr(0x10000 + (i % 100))
def test_string_writer_append_grow_and_switch_kind() -> None:
# Test growing buffer AND switching from kind 1 to kind 2
w = StringWriter()
# Fill with ASCII to grow buffer
for i in range(500):
w.append(ord('A'))
assert len(w) == 500
# Now append non-ASCII that requires kind 2, triggering both grow and kind switch
for i in range(500):
w.append(0x100 + i)
result = w.getvalue()
assert len(result) == 1000
for i in range(500):
assert result[i] == 'A'
for i in range(500):
assert result[500 + i] == chr(0x100 + i)
# Test growing buffer AND switching from kind 2 to kind 4
w2 = StringWriter()
w2.append(0x100) # Switch to kind 2
# Fill with kind 2 characters to grow buffer
for i in range(300):
w2.append(0x200 + (i % 100))
assert len(w2) == 301
# Now append characters that require kind 4, triggering both grow and kind switch
for i in range(300):
w2.append(0x10000 + i)
result2 = w2.getvalue()
assert len(result2) == 601
assert result2[0] == chr(0x100)
for i in range(300):
assert result2[1 + i] == chr(0x200 + (i % 100))
for i in range(300):
assert result2[301 + i] == chr(0x10000 + i)
# Test switching kind 1->4 with buffer growth
w3 = StringWriter()
for i in range(300):
w3.append(ord('X'))
# Jump directly to kind 4
w3.append(0x10000)
result3 = w3.getvalue()
assert len(result3) == 301
for i in range(300):
assert result3[i] == 'X'
assert result3[300] == chr(0x10000)
def test_string_writer_wrapper_functions() -> None:
cls: Any = StringWriter
s: Any = cls()
assert repr(s) == "StringWriter('')"
assert len(s) == 0
s.append(ord('a'))
s.append(0x100)
s.append(0x10000)
s.write("foo")
assert s.getvalue() == "a" + chr(0x100) + chr(0x10000) + "foo"
assert len(s) == 6
assert s[0] == ord('a')
assert s[1] == 0x100
assert s[2] == 0x10000
assert s[-1] == ord('o')
assert s[-6] == ord('a')
assert isinstance(s, cls)
with assertRaises(TypeError):
s.append("x")
with assertRaises(TypeError):
s.write(b"foo")
with assertRaises(ValueError):
s.append(0x110000)
with assertRaises(ValueError):
s.append(-1)
with assertRaises(IndexError):
s[6]
with assertRaises(IndexError):
s[-7]
def test_explicit_init_resets_via_any() -> None:
# Calling __init__ explicitly on an already-initialized writer resets it.
# Any previously-allocated heap buffer must be freed to avoid leaks.
bw: Any = BytesWriter()
bw.write(b"hello")
bw.__init__()
assert bw.getvalue() == b""
bw.write(b"world")
assert bw.getvalue() == b"world"
# Also trigger a heap-allocated buffer before the explicit __init__ call,
# to exercise the path that would otherwise leak memory.
bw2: Any = BytesWriter()
bw2.write(b"x" * 10000)
bw2.__init__()
assert bw2.getvalue() == b""
bw2.write(b"short")
assert bw2.getvalue() == b"short"
sw: Any = StringWriter()
sw.write("hello")
sw.__init__()
assert sw.getvalue() == ""
sw.write("world")
assert sw.getvalue() == "world"
sw2: Any = StringWriter()
sw2.write("y" * 10000)
sw2.__init__()
assert sw2.getvalue() == ""
sw2.write("short")
assert sw2.getvalue() == "short"
def test_new_without_init_is_usable() -> None:
# BytesWriter.__new__(BytesWriter) must return a fully initialized object
# that does not require an explicit __init__ call to be usable.
bw: Any = BytesWriter.__new__(BytesWriter)
assert bw.getvalue() == b""
bw.write(b"hello")
assert bw.getvalue() == b"hello"
sw: Any = StringWriter.__new__(StringWriter)
assert sw.getvalue() == ""
sw.write("hello")
assert sw.getvalue() == "hello"