Rework Python binding
diff --git a/pyproject.toml b/pyproject.toml index 4806a64..cd6beae 100644 --- a/pyproject.toml +++ b/pyproject.toml
@@ -12,6 +12,9 @@ description = "LC3 Codec library wrapper" requires-python = ">=3.10" +[project.optional-dependencies] + dev = ["pytest"] + [project.urls] Homepage = "https://github.com/google/liblc3"
diff --git a/python/lc3.py b/python/lc3.py index e07efee..6301e8d 100644 --- a/python/lc3.py +++ b/python/lc3.py
@@ -13,96 +13,124 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from __future__ import annotations import array import ctypes import enum import glob import os +import typing from ctypes import c_bool, c_byte, c_int, c_uint, c_size_t, c_void_p from ctypes.util import find_library +from collections.abc import Iterable + + +class BaseError(Exception): + """Base error raised by liblc3.""" + + +class InitializationError(RuntimeError, BaseError): + """Error raised when liblc3 cannot be initialized.""" + + +class InvalidArgumentError(ValueError, BaseError): + """Error raised when a bad argument is given.""" + + +class _PcmFormat(enum.IntEnum): + S16 = 0 + S24 = 1 + S24_3LE = 2 + FLOAT = 3 class _Base: - def __init__(self, frame_duration, samplerate, nchannels, **kwargs): + def __init__( + self, + frame_duration_us: int, + sample_rate_hz: int, + num_channels: int, + hrmode: bool = False, + pcm_sample_rate_hz: int | None = None, + libpath: str | None = None, + ) -> None: - self.hrmode = False - self.dt_us = int(frame_duration * 1000) - self.sr_hz = int(samplerate) - self.sr_pcm_hz = self.sr_hz - self.nchannels = nchannels + self.hrmode = hrmode + self.frame_duration_us = frame_duration_us + self.sample_rate_hz = sample_rate_hz + self.pcm_sample_rate_hz = pcm_sample_rate_hz or self.sample_rate_hz + self.num_channels = num_channels - libpath = None + if self.frame_duration_us not in [2500, 5000, 7500, 10000]: + raise InvalidArgumentError( + "Invalid frame duration: %.1f ms" % self.frame_duration_us + ) - for k in kwargs.keys(): - if k == 'hrmode': - self.hrmode = bool(kwargs[k]) - elif k == 'pcm_samplerate': - self.sr_pcm_hz = int(kwargs[k]) - elif k == 'libpath': - libpath = kwargs[k] - else: - raise ValueError("Invalid keyword argument: " + k) + allowed_samplerate = ( + [8000, 16000, 24000, 32000, 48000] if not self.hrmode else [48000, 96000] + ) - if self.dt_us not in [2500, 5000, 7500, 10000]: - raise ValueError( - "Invalid frame duration: %.1f ms" % frame_duration) - - allowed_samplerate = [8000, 16000, 24000, 32000, 48000] \ - if not self.hrmode else [48000, 96000] - - if self.sr_hz not in allowed_samplerate: - raise ValueError("Invalid sample rate: %d Hz" % samplerate) + if self.sample_rate_hz not in allowed_samplerate: + raise InvalidArgumentError("Invalid sample rate: %d Hz" % sample_rate_hz) if libpath is None: - mesonpy_lib = glob.glob(os.path.join(os.path.dirname(__file__), '.lc3.mesonpy.libs', '*lc3*')) + mesonpy_lib = glob.glob( + os.path.join(os.path.dirname(__file__), ".lc3.mesonpy.libs", "*lc3*") + ) if mesonpy_lib: libpath = mesonpy_lib[0] else: libpath = find_library("lc3") if not libpath: - raise Exception("LC3 library not found") + raise InitializationError("LC3 library not found") lib = ctypes.cdll.LoadLibrary(libpath) - try: - lib.lc3_hr_frame_samples \ - and lib.lc3_hr_frame_block_bytes \ - and lib.lc3_hr_resolve_bitrate \ - and lib.lc3_hr_delay_samples - - except AttributeError: - + if not all( + hasattr(lib, func) + for func in ( + "lc3_hr_frame_samples", + "lc3_hr_frame_block_bytes", + "lc3_hr_resolve_bitrate", + "lc3_hr_delay_samples", + ) + ): if self.hrmode: - raise Exception('High-Resolution interface not available') + raise InitializationError("High-Resolution interface not available") - lib.lc3_hr_frame_samples = \ - lambda hrmode, dt_us, sr_hz: \ - lib.lc3_frame_samples(dt_us, sr_hz) - - lib.lc3_hr_frame_block_bytes = \ - lambda hrmode, dt_us, sr_hz, nchannels, bitrate: \ - nchannels * lib.lc3_frame_bytes(dt_us, bitrate // 2) - - lib.lc3_hr_resolve_bitrate = \ - lambda hrmode, dt_us, sr_hz, nbytes: \ - lib.lc3_resolve_bitrate(dt_us, nbytes) - - lib.lc3_hr_delay_samples = \ - lambda hrmode, dt_us, sr_hz: \ - lib.lc3_delay_samples(dt_us, sr_hz) + lc3_hr_frame_samples = lambda hrmode, dt_us, sr_hz: lib.lc3_frame_samples( + dt_us, sr_hz + ) + lc3_hr_frame_block_bytes = ( + lambda hrmode, dt_us, sr_hz, num_channels, bitrate: num_channels + * lib.lc3_frame_bytes(dt_us, bitrate // 2) + ) + lc3_hr_resolve_bitrate = ( + lambda hrmode, dt_us, sr_hz, nbytes: lib.lc3_resolve_bitrate( + dt_us, nbytes + ) + ) + lc3_hr_delay_samples = lambda hrmode, dt_us, sr_hz: lib.lc3_delay_samples( + dt_us, sr_hz + ) + setattr(lib, "lc3_hr_frame_samples", lc3_hr_frame_samples) + setattr(lib, "lc3_hr_frame_block_bytes", lc3_hr_frame_block_bytes) + setattr(lib, "lc3_hr_resolve_bitrate", lc3_hr_resolve_bitrate) + setattr(lib, "lc3_hr_delay_samples", lc3_hr_delay_samples) lib.lc3_hr_frame_samples.argtypes = [c_bool, c_int, c_int] - lib.lc3_hr_frame_block_bytes.argtypes = \ - [c_bool, c_int, c_int, c_int, c_int] + lib.lc3_hr_frame_block_bytes.argtypes = [c_bool, c_int, c_int, c_int, c_int] lib.lc3_hr_resolve_bitrate.argtypes = [c_bool, c_int, c_int, c_int] lib.lc3_hr_delay_samples.argtypes = [c_bool, c_int, c_int] self.lib = lib - libc = ctypes.cdll.LoadLibrary(find_library("c")) + if not (libc_path := find_library("c")): + raise InitializationError("Unable to find libc") + libc = ctypes.cdll.LoadLibrary(libc_path) self.malloc = libc.malloc self.malloc.argtypes = [c_size_t] @@ -111,138 +139,192 @@ self.free = libc.free self.free.argtypes = [c_void_p] - def get_frame_samples(self): + def get_frame_samples(self) -> int: """ - Returns the number of PCM samples in an LC3 frame + Returns the number of PCM samples in an LC3 frame. """ ret = self.lib.lc3_hr_frame_samples( - self.hrmode, self.dt_us, self.sr_pcm_hz) + self.hrmode, self.frame_duration_us, self.pcm_sample_rate_hz + ) if ret < 0: - raise ValueError("Bad parameters") + raise InvalidArgumentError("Bad parameters") return ret - def get_frame_bytes(self, bitrate): + def get_frame_bytes(self, bitrate: int) -> int: """ Returns the size of LC3 frame blocks, from bitrate in bit per seconds. A target `bitrate` equals 0 or `INT32_MAX` returns respectively the minimum and maximum allowed size. """ ret = self.lib.lc3_hr_frame_block_bytes( - self.hrmode, self.dt_us, self.sr_hz, self.nchannels, bitrate) + self.hrmode, + self.frame_duration_us, + self.sample_rate_hz, + self.num_channels, + bitrate, + ) if ret < 0: - raise ValueError("Bad parameters") + raise InvalidArgumentError("Bad parameters") return ret - def resolve_bitrate(self, nbytes): + def resolve_bitrate(self, num_bytes: int) -> int: """ Returns the bitrate in bits per seconds, from the size of LC3 frames. """ ret = self.lib.lc3_hr_resolve_bitrate( - self.hrmode, self.dt_us, self.sr_hz, nbytes) + self.hrmode, self.frame_duration_us, self.sample_rate_hz, num_bytes + ) if ret < 0: - raise ValueError("Bad parameters") + raise InvalidArgumentError("Bad parameters") return ret - def get_delay_samples(self): + def get_delay_samples(self) -> int: """ - Returns the algorithmic delay, as a number of samples. - """ + Returns the algorithmic delay, as a number of samples. + """ ret = self.lib.lc3_hr_delay_samples( - self.hrmode, self.dt_us, self.sr_pcm_hz) + self.hrmode, self.frame_duration_us, self.pcm_sample_rate_hz + ) if ret < 0: - raise ValueError("Bad parameters") + raise InvalidArgumentError("Bad parameters") return ret - @staticmethod - def _resolve_pcm_format(bitdepth): - PCM_FORMAT_S16 = 0 - PCM_FORMAT_S24 = 1 - PCM_FORMAT_S24_3LE = 2 - PCM_FORMAT_FLOAT = 3 - - match bitdepth: - case 16: return (PCM_FORMAT_S16, ctypes.c_int16) - case 24: return (PCM_FORMAT_S24_3LE, 3 * ctypes.c_byte) - case None: return (PCM_FORMAT_FLOAT, ctypes.c_float) - case _: raise ValueError("Could not interpret PCM bitdepth") + @classmethod + def _resolve_pcm_format(cls, bit_depth: int | None) -> tuple[ + _PcmFormat, + type[ctypes.c_int16] | type[ctypes.Array[ctypes.c_byte]] | type[ctypes.c_float], + ]: + match bit_depth: + case 16: + return (_PcmFormat.S16, ctypes.c_int16) + case 24: + return (_PcmFormat.S24_3LE, 3 * ctypes.c_byte) + case None: + return (_PcmFormat.FLOAT, ctypes.c_float) + case _: + raise InvalidArgumentError("Could not interpret PCM bit_depth") class Encoder(_Base): """ - LC3 Encoder wrapper + LC3 Encoder wrapper. - The `frame_duration` expressed in milliseconds is any of 2.5, 5.0, 7.5 - or 10.0. The `samplerate`, in Hertz, is any of 8000, 16000, 24000, 32000 + The `frame_duration_us`, in microsecond, is any of 2500, 5000, 7500, or 10000. + The `sample_rate_hz`, in Hertz, is any of 8000, 16000, 24000, 32000 or 48000, unless High-Resolution mode is enabled. In High-Resolution mode, - the `samplerate` is 48000 or 96000. + the `sample_rate_hz` is 48000 or 96000. - By default, one channel is processed. When `nchannels` is greater than one, + By default, one channel is processed. When `num_channels` is greater than one, the PCM input stream is read interleaved and consecutives LC3 frames are output, for each channel. - Keyword arguments: - hrmode : Enable High-Resolution mode, default is `False`. - sr_pcm_hz : Input PCM samplerate, enable downsampling of input. - libpath : LC3 library path and name + Optional arguments: + hrmode : Enable High-Resolution mode, default is `False`. + input_sample_rate_hz : Input PCM samplerate, enable downsampling of input. + libpath : LC3 library path and name """ class c_encoder_t(c_void_p): pass - def __init__(self, frame_duration, samplerate, nchannels=1, **kwargs): + def __init__( + self, + frame_duration_us: int, + sample_rate_hz: int, + num_channels: int = 1, + hrmode: bool = False, + input_sample_rate_hz: int | None = None, + libpath: str | None = None, + ) -> None: - super().__init__(frame_duration, samplerate, nchannels, **kwargs) + super().__init__( + frame_duration_us, + sample_rate_hz, + num_channels, + hrmode, + input_sample_rate_hz, + libpath, + ) lib = self.lib - try: - lib.lc3_hr_encoder_size \ - and lib.lc3_hr_setup_encoder + if not all( + hasattr(lib, func) + for func in ("lc3_hr_encoder_size", "lc3_hr_setup_encoder") + ): + if self.hrmode: + raise InitializationError("High-Resolution interface not available") - except AttributeError: + lc3_hr_encoder_size = lambda hrmode, dt_us, sr_hz: lib.lc3_encoder_size( + dt_us, sr_hz + ) - assert not self.hrmode - - lib.lc3_hr_encoder_size = \ - lambda hrmode, dt_us, sr_hz: \ - lib.lc3_encoder_size(dt_us, sr_hz) - - lib.lc3_hr_setup_encoder = \ - lambda hrmode, dt_us, sr_hz, sr_pcm_hz, mem: \ - lib.lc3_setup_encoder(dt_us, sr_hz, sr_pcm_hz, mem) + lc3_hr_setup_encoder = ( + lambda hrmode, dt_us, sr_hz, sr_pcm_hz, mem: lib.lc3_setup_encoder( + dt_us, sr_hz, sr_pcm_hz, mem + ) + ) + setattr(lib, "lc3_hr_encoder_size", lc3_hr_encoder_size) + setattr(lib, "lc3_hr_setup_encoder", lc3_hr_setup_encoder) lib.lc3_hr_encoder_size.argtypes = [c_bool, c_int, c_int] lib.lc3_hr_encoder_size.restype = c_uint - lib.lc3_hr_setup_encoder.argtypes = \ - [c_bool, c_int, c_int, c_int, c_void_p] + lib.lc3_hr_setup_encoder.argtypes = [c_bool, c_int, c_int, c_int, c_void_p] lib.lc3_hr_setup_encoder.restype = self.c_encoder_t - lib.lc3_encode.argtypes = \ - [self.c_encoder_t, c_int, c_void_p, c_int, c_int, c_void_p] + lib.lc3_encode.argtypes = [ + self.c_encoder_t, + c_int, + c_void_p, + c_int, + c_int, + c_void_p, + ] - def new_encoder(): return lib.lc3_hr_setup_encoder( - self.hrmode, self.dt_us, self.sr_hz, self.sr_pcm_hz, - self.malloc(lib.lc3_hr_encoder_size( - self.hrmode, self.dt_us, self.sr_pcm_hz))) + def new_encoder(): + return lib.lc3_hr_setup_encoder( + self.hrmode, + self.frame_duration_us, + self.sample_rate_hz, + self.pcm_sample_rate_hz, + self.malloc( + lib.lc3_hr_encoder_size( + self.hrmode, self.frame_duration_us, self.pcm_sample_rate_hz + ) + ), + ) - self.__encoders = [new_encoder() for i in range(nchannels)] + self.__encoders = [new_encoder() for _ in range(num_channels)] - def __del__(self): + def __del__(self) -> None: try: (self.free(encoder) for encoder in self.__encoders) finally: return - def encode(self, pcm, nbytes, bitdepth=None): - """ - Encode LC3 frame(s), for each channel. + @typing.overload + def encode( + self, + pcm: bytes | bytearray | memoryview | Iterable[float], + num_bytes: int, + bit_depth: None = None, + ) -> bytes: ... - The `pcm` input is given in two ways. When no `bitdepth` is defined, + @typing.overload + def encode( + self, pcm: bytes | bytearray | memoryview, num_bytes: int, bit_depth: int + ) -> bytes: ... + + def encode(self, pcm, num_bytes: int, bit_depth: int | None = None) -> bytes: + """ + Encodes LC3 frame(s), for each channel. + + The `pcm` input is given in two ways. When no `bit_depth` is defined, it's a vector of floating point values from -1 to 1, coding the sample - levels. When `bitdepth` is defined, `pcm` is interpreted as a byte-like - object, each sample coded on `bitdepth` bits (16 or 24). + levels. When `bit_depth` is defined, `pcm` is interpreted as a byte-like + object, each sample coded on `bit_depth` bits (16 or 24). The machine endianness, or little endian, is used for 16 or 24 bits width, respectively. In both cases, the `pcm` vector data is padded with zeros when @@ -250,151 +332,191 @@ Channels concatenation of encoded LC3 frames, of `nbytes`, is returned. """ - nchannels = self.nchannels + nchannels = self.num_channels frame_samples = self.get_frame_samples() - (pcm_fmt, pcm_t) = self._resolve_pcm_format(bitdepth) + (pcm_fmt, pcm_t) = self._resolve_pcm_format(bit_depth) pcm_len = nchannels * frame_samples - if bitdepth is None: - pcm_buffer = array.array('f', pcm) + if bit_depth is None: + pcm_buffer = array.array("f", pcm) # Invert test to catch NaN if not abs(sum(pcm)) / frame_samples < 2: - raise ValueError("Out of range PCM input") + raise InvalidArgumentError("Out of range PCM input") padding = max(pcm_len - frame_samples, 0) - pcm_buffer.extend(array.array('f', [0] * padding)) + pcm_buffer.extend(array.array("f", [0] * padding)) else: padding = max(pcm_len * ctypes.sizeof(pcm_t) - len(pcm), 0) - pcm_buffer = bytearray(pcm) + bytearray(padding) + pcm_buffer = bytearray(pcm) + bytearray(padding) # type: ignore - data_buffer = (c_byte * nbytes)() + data_buffer = (c_byte * num_bytes)() data_offset = 0 - for (ich, encoder) in enumerate(self.__encoders): + for ich, encoder in enumerate(self.__encoders): pcm_offset = ich * ctypes.sizeof(pcm_t) pcm = (pcm_t * (pcm_len - ich)).from_buffer(pcm_buffer, pcm_offset) - data_size = nbytes // nchannels + int(ich < nbytes % nchannels) + data_size = num_bytes // nchannels + int(ich < num_bytes % nchannels) data = (c_byte * data_size).from_buffer(data_buffer, data_offset) data_offset += data_size - ret = self.lib.lc3_encode( - encoder, pcm_fmt, pcm, nchannels, len(data), data) + ret = self.lib.lc3_encode(encoder, pcm_fmt, pcm, nchannels, len(data), data) if ret < 0: - raise ValueError("Bad parameters") + raise InvalidArgumentError("Bad parameters") return bytes(data_buffer) class Decoder(_Base): """ - LC3 Decoder wrapper + LC3 Decoder wrapper. - The `frame_duration` expressed in milliseconds is any of 2.5, 5.0, 7.5 - or 10.0. The `samplerate`, in Hertz, is any of 8000, 16000, 24000, 32000 - or 48000, unless High-Resolution mode is enabled. In High-Resolution - mode, the `samplerate` is 48000 or 96000. + The `frame_duration_us`, in microsecond, is any of 2500, 5000, 7500, or 10000. + The `sample_rate_hz`, in Hertz, is any of 8000, 16000, 24000, 32000 + or 48000, unless High-Resolution mode is enabled. In High-Resolution mode, + the `sample_rate_hz` is 48000 or 96000. - By default, one channel is processed. When `nchannels` is greater than one, + By default, one channel is processed. When `num_chanels` is greater than one, the PCM input stream is read interleaved and consecutives LC3 frames are output, for each channel. - Keyword arguments: - hrmode : Enable High-Resolution mode, default is `False`. - sr_pcm_hz : Output PCM samplerate, enable upsampling of output. - libpath : LC3 library path and name + Optional arguments: + hrmode : Enable High-Resolution mode, default is `False`. + output_sample_rate_hz : Output PCM sample_rate_hz, enable upsampling of output. + libpath : LC3 library path and name """ class c_decoder_t(c_void_p): pass - def __init__(self, frame_duration, samplerate, nchannels=1, **kwargs): + def __init__( + self, + frame_duration_us: int, + sample_rate_hz: int, + num_channels: int = 1, + hrmode: bool = False, + output_sample_rate_hz: int | None = None, + libpath: str | None = None, + ) -> None: - super().__init__(frame_duration, samplerate, nchannels, **kwargs) + super().__init__( + frame_duration_us, + sample_rate_hz, + num_channels, + hrmode, + output_sample_rate_hz, + libpath, + ) lib = self.lib - try: - lib.lc3_hr_decoder_size \ - and lib.lc3_hr_setup_decoder + if not all( + hasattr(lib, func) + for func in ("lc3_hr_decoder_size", "lc3_hr_setup_decoder") + ): + if self.hrmode: + raise InitializationError("High-Resolution interface not available") - except AttributeError: + lc3_hr_decoder_size = lambda hrmode, dt_us, sr_hz: lib.lc3_decoder_size( + dt_us, sr_hz + ) - assert not self.hrmode - - lib.lc3_hr_decoder_size = \ - lambda hrmode, dt_us, sr_hz: \ - lib.lc3_decoder_size(dt_us, sr_hz) - - lib.lc3_hr_setup_decoder = \ - lambda hrmode, dt_us, sr_hz, sr_pcm_hz, mem: \ - lib.lc3_setup_decoder(dt_us, sr_hz, sr_pcm_hz, mem) + lc3_hr_setup_decoder = ( + lambda hrmode, dt_us, sr_hz, sr_pcm_hz, mem: lib.lc3_setup_decoder( + dt_us, sr_hz, sr_pcm_hz, mem + ) + ) + setattr(lib, "lc3_hr_decoder_size", lc3_hr_decoder_size) + setattr(lib, "lc3_hr_setup_decoder", lc3_hr_setup_decoder) lib.lc3_hr_decoder_size.argtypes = [c_bool, c_int, c_int] lib.lc3_hr_decoder_size.restype = c_uint - lib.lc3_hr_setup_decoder.argtypes = \ - [c_bool, c_int, c_int, c_int, c_void_p] + lib.lc3_hr_setup_decoder.argtypes = [c_bool, c_int, c_int, c_int, c_void_p] lib.lc3_hr_setup_decoder.restype = self.c_decoder_t - lib.lc3_decode.argtypes = \ - [self.c_decoder_t, c_void_p, c_int, c_int, c_void_p, c_int] + lib.lc3_decode.argtypes = [ + self.c_decoder_t, + c_void_p, + c_int, + c_int, + c_void_p, + c_int, + ] - def new_decoder(): return lib.lc3_hr_setup_decoder( - self.hrmode, self.dt_us, self.sr_hz, self.sr_pcm_hz, - self.malloc(lib.lc3_hr_decoder_size( - self.hrmode, self.dt_us, self.sr_pcm_hz))) + def new_decoder(): + return lib.lc3_hr_setup_decoder( + self.hrmode, + self.frame_duration_us, + self.sample_rate_hz, + self.pcm_sample_rate_hz, + self.malloc( + lib.lc3_hr_decoder_size( + self.hrmode, self.frame_duration_us, self.pcm_sample_rate_hz + ) + ), + ) - self.__decoders = [new_decoder() for i in range(nchannels)] + self.__decoders = [new_decoder() for i in range(num_channels)] - def __del__(self): + def __del__(self) -> None: try: (self.free(decoder) for decoder in self.__decoders) finally: return - def decode(self, data, bitdepth=None): + @typing.overload + def decode( + self, data: bytes | bytearray | memoryview, bit_depth: None = None + ) -> array.array[float]: ... + + @typing.overload + def decode(self, data: bytes | bytearray | memoryview, bit_depth: int) -> bytes: ... + + def decode( + self, data: bytes | bytearray | memoryview, bit_depth: int | None = None + ) -> bytes | array.array[float]: """ - Decode an LC3 frame + Decodes an LC3 frame. The input `data` is the channels concatenation of LC3 frames in a byte-like object. Interleaved PCM samples are returned according to - the `bitdepth` indication. - When no `bitdepth` is defined, it's a vector of floating point values - from -1 to 1, coding the sample levels. When `bitdepth` is defined, - it returns a byte array, each sample coded on `bitdepth` bits. + the `bit_depth` indication. + When no `bit_depth` is defined, it's a vector of floating point values + from -1 to 1, coding the sample levels. When `bit_depth` is defined, + it returns a byte array, each sample coded on `bit_depth` bits. The machine endianness, or little endian, is used for 16 or 24 bits width, respectively. """ - nchannels = self.nchannels - frame_samples = self.get_frame_samples() + num_channels = self.num_channels - (pcm_fmt, pcm_t) = self._resolve_pcm_format(bitdepth) - pcm_len = nchannels * self.get_frame_samples() + (pcm_fmt, pcm_t) = self._resolve_pcm_format(bit_depth) + pcm_len = num_channels * self.get_frame_samples() pcm_buffer = (pcm_t * pcm_len)() data_buffer = bytearray(data) data_offset = 0 - for (ich, decoder) in enumerate(self.__decoders): + for ich, decoder in enumerate(self.__decoders): pcm_offset = ich * ctypes.sizeof(pcm_t) pcm = (pcm_t * (pcm_len - ich)).from_buffer(pcm_buffer, pcm_offset) - data_size = len(data_buffer) // nchannels + \ - int(ich < len(data_buffer) % nchannels) - data = (c_byte * data_size).from_buffer(data_buffer, data_offset) + data_size = len(data_buffer) // num_channels + int( + ich < len(data_buffer) % num_channels + ) + buf = (c_byte * data_size).from_buffer(data_buffer, data_offset) data_offset += data_size ret = self.lib.lc3_decode( - decoder, data, len(data), pcm_fmt, pcm, self.nchannels) + decoder, buf, len(buf), pcm_fmt, pcm, self.num_channels + ) if ret < 0: - raise ValueError("Bad parameters") + raise InvalidArgumentError("Bad parameters") - return array.array('f', pcm_buffer) \ - if bitdepth is None else bytes(pcm_buffer) + return array.array("f", pcm_buffer) if bit_depth is None else bytes(pcm_buffer)
diff --git a/python/tests/basic_test.py b/python/tests/basic_test.py new file mode 100644 index 0000000..de61088 --- /dev/null +++ b/python/tests/basic_test.py
@@ -0,0 +1,47 @@ +import array +import lc3 +import pytest + + +@pytest.mark.parametrize("bit_depth,decoded_length", [(16, 960), (24, 1440)]) +def test_decode_with_bit_depth(bit_depth, decoded_length) -> None: + decoder = lc3.Decoder(frame_duration_us=10000, sample_rate_hz=48000) + decoded_frame = decoder.decode(bytes(120), bit_depth=bit_depth) + assert isinstance(decoded_frame, bytes) + assert len(decoded_frame) == decoded_length + + +def test_decode_without_bit_depth() -> None: + decoder = lc3.Decoder(frame_duration_us=10000, sample_rate_hz=48000) + decoded_frame = decoder.decode(bytes(120)) + assert isinstance(decoded_frame, array.array) + assert len(decoded_frame) == 480 + assert all(isinstance(e, float) for e in decoded_frame) + + +def test_decode_with_bad_bit_depth() -> None: + decoder = lc3.Decoder(frame_duration_us=10000, sample_rate_hz=48000) + with pytest.raises(lc3.InvalidArgumentError): + decoder.decode(bytes(120), bit_depth=128) + + +@pytest.mark.parametrize("bit_depth", [16, 24]) +def test_encode_with_bit_depth(bit_depth) -> None: + encoder = lc3.Encoder(frame_duration_us=10000, sample_rate_hz=48000) + encoded_frame = encoder.encode(bytes(1920), num_bytes=120, bit_depth=bit_depth) + assert isinstance(encoded_frame, bytes) + assert len(encoded_frame) == 120 + + +@pytest.mark.parametrize("pcm", [bytes(1920), [0.0] * 1920]) +def test_encode_without_bit_depth(pcm) -> None: + encoder = lc3.Encoder(frame_duration_us=10000, sample_rate_hz=48000) + encoded_frame = encoder.encode(pcm, num_bytes=120, bit_depth=None) + assert isinstance(encoded_frame, bytes) + assert len(encoded_frame) == 120 + + +def test_encode_with_bad_bit_depth() -> None: + encoder = lc3.Encoder(frame_duration_us=10000, sample_rate_hz=48000) + with pytest.raises(lc3.InvalidArgumentError): + encoder.encode(bytes(1920), num_bytes=120, bit_depth=128)
diff --git a/python/tools/decoder.py b/python/tools/decoder.py index c10c1e4..bc3b554 100755 --- a/python/tools/decoder.py +++ b/python/tools/decoder.py
@@ -53,7 +53,7 @@ samplerate = header[2] * 100 nchannels = header[4] -frame_duration = header[5] / 100 +frame_duration = header[5] * 10 stream_length = header[7] # --- Setup output --- @@ -80,7 +80,7 @@ for i in range(0, encoded_length, frame_length): lc3_frame_size = struct.unpack('=H', f_lc3.read(2))[0] - pcm = dec.decode(f_lc3.read(lc3_frame_size), bitdepth=bitdepth) + pcm = dec.decode(f_lc3.read(lc3_frame_size), bit_depth=bitdepth) pcm = pcm[max(encoded_length - stream_length - i, 0) * pcm_size: min(encoded_length - i, frame_length) * pcm_size]
diff --git a/python/tools/encoder.py b/python/tools/encoder.py index 9f291ff..436d769 100755 --- a/python/tools/encoder.py +++ b/python/tools/encoder.py
@@ -57,7 +57,7 @@ # --- Setup encoder --- enc = lc3.Encoder( - args.frame_duration, samplerate, nchannels, libpath=args.libpath) + int(args.frame_duration)*1000, samplerate, nchannels, libpath=args.libpath) frame_size = enc.get_frame_bytes(args.bitrate) frame_length = enc.get_frame_samples() bitrate = enc.resolve_bitrate(frame_size) @@ -77,7 +77,7 @@ f_lc3.write(struct.pack('=H', frame_size)) pcm = wavfile.readframes(frame_length) - f_lc3.write(enc.encode(pcm, frame_size, bitdepth=bitdepth)) + f_lc3.write(enc.encode(pcm, frame_size, bit_depth=bitdepth)) # --- Cleanup ---