| # |
| # Copyright 2024 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| from __future__ import annotations |
| |
| import array |
| import ctypes |
| import enum |
| import glob |
| import os |
| import typing |
| |
| from ctypes import c_bool, c_byte, c_int, c_uint, c_size_t, c_void_p |
| from ctypes.util import find_library |
| from collections.abc import Iterable |
| |
| |
| class BaseError(Exception): |
| """Base error raised by liblc3.""" |
| |
| |
| class InitializationError(RuntimeError, BaseError): |
| """Error raised when liblc3 cannot be initialized.""" |
| |
| |
| class InvalidArgumentError(ValueError, BaseError): |
| """Error raised when a bad argument is given.""" |
| |
| |
| class _PcmFormat(enum.IntEnum): |
| S16 = 0 |
| S24 = 1 |
| S24_3LE = 2 |
| FLOAT = 3 |
| |
| |
| class _Base: |
| |
| def __init__( |
| self, |
| frame_duration_us: int, |
| sample_rate_hz: int, |
| num_channels: int, |
| hrmode: bool = False, |
| pcm_sample_rate_hz: int | None = None, |
| libpath: str | None = None, |
| ) -> None: |
| |
| self.hrmode = hrmode |
| self.frame_duration_us = frame_duration_us |
| self.sample_rate_hz = sample_rate_hz |
| self.pcm_sample_rate_hz = pcm_sample_rate_hz or self.sample_rate_hz |
| self.num_channels = num_channels |
| |
| if self.frame_duration_us not in [2500, 5000, 7500, 10000]: |
| raise InvalidArgumentError( |
| f"Invalid frame duration: {self.frame_duration_us} us ({self.frame_duration_us / 1000:.1f} ms)" |
| ) |
| |
| allowed_samplerate = ( |
| [8000, 16000, 24000, 32000, 48000] if not self.hrmode else [48000, 96000] |
| ) |
| |
| if self.sample_rate_hz not in allowed_samplerate: |
| raise InvalidArgumentError(f"Invalid sample rate: {sample_rate_hz} Hz") |
| |
| if libpath is None: |
| mesonpy_lib = glob.glob( |
| os.path.join(os.path.dirname(__file__), ".lc3py.mesonpy.libs", "*lc3*") |
| ) |
| |
| if mesonpy_lib: |
| libpath = mesonpy_lib[0] |
| else: |
| libpath = find_library("lc3") |
| if not libpath: |
| raise InitializationError("LC3 library not found") |
| |
| lib = ctypes.cdll.LoadLibrary(libpath) |
| |
| if not all( |
| hasattr(lib, func) |
| for func in ( |
| "lc3_hr_frame_samples", |
| "lc3_hr_frame_block_bytes", |
| "lc3_hr_resolve_bitrate", |
| "lc3_hr_delay_samples", |
| ) |
| ): |
| if self.hrmode: |
| raise InitializationError("High-Resolution interface not available") |
| |
| lc3_hr_frame_samples = lambda hrmode, dt_us, sr_hz: lib.lc3_frame_samples( |
| dt_us, sr_hz |
| ) |
| lc3_hr_frame_block_bytes = ( |
| lambda hrmode, dt_us, sr_hz, num_channels, bitrate: num_channels |
| * lib.lc3_frame_bytes(dt_us, bitrate // 2) |
| ) |
| lc3_hr_resolve_bitrate = ( |
| lambda hrmode, dt_us, sr_hz, nbytes: lib.lc3_resolve_bitrate( |
| dt_us, nbytes |
| ) |
| ) |
| lc3_hr_delay_samples = lambda hrmode, dt_us, sr_hz: lib.lc3_delay_samples( |
| dt_us, sr_hz |
| ) |
| setattr(lib, "lc3_hr_frame_samples", lc3_hr_frame_samples) |
| setattr(lib, "lc3_hr_frame_block_bytes", lc3_hr_frame_block_bytes) |
| setattr(lib, "lc3_hr_resolve_bitrate", lc3_hr_resolve_bitrate) |
| setattr(lib, "lc3_hr_delay_samples", lc3_hr_delay_samples) |
| |
| lib.lc3_hr_frame_samples.argtypes = [c_bool, c_int, c_int] |
| lib.lc3_hr_frame_block_bytes.argtypes = [c_bool, c_int, c_int, c_int, c_int] |
| lib.lc3_hr_resolve_bitrate.argtypes = [c_bool, c_int, c_int, c_int] |
| lib.lc3_hr_delay_samples.argtypes = [c_bool, c_int, c_int] |
| self.lib = lib |
| |
| if not (libc_path := find_library("c")): |
| raise InitializationError("Unable to find libc") |
| libc = ctypes.cdll.LoadLibrary(libc_path) |
| |
| self.malloc = libc.malloc |
| self.malloc.argtypes = [c_size_t] |
| self.malloc.restype = c_void_p |
| |
| self.free = libc.free |
| self.free.argtypes = [c_void_p] |
| |
| def get_frame_samples(self) -> int: |
| """ |
| Returns the number of PCM samples in an LC3 frame. |
| """ |
| ret = self.lib.lc3_hr_frame_samples( |
| self.hrmode, self.frame_duration_us, self.pcm_sample_rate_hz |
| ) |
| if ret < 0: |
| raise InvalidArgumentError("Bad parameters") |
| return ret |
| |
| def get_frame_bytes(self, bitrate: int) -> int: |
| """ |
| Returns the size of LC3 frame blocks, from bitrate in bit per seconds. |
| A target `bitrate` equals 0 or `INT32_MAX` returns respectively |
| the minimum and maximum allowed size. |
| """ |
| ret = self.lib.lc3_hr_frame_block_bytes( |
| self.hrmode, |
| self.frame_duration_us, |
| self.sample_rate_hz, |
| self.num_channels, |
| bitrate, |
| ) |
| if ret < 0: |
| raise InvalidArgumentError("Bad parameters") |
| return ret |
| |
| def resolve_bitrate(self, num_bytes: int) -> int: |
| """ |
| Returns the bitrate in bits per seconds, from the size of LC3 frames. |
| """ |
| ret = self.lib.lc3_hr_resolve_bitrate( |
| self.hrmode, self.frame_duration_us, self.sample_rate_hz, num_bytes |
| ) |
| if ret < 0: |
| raise InvalidArgumentError("Bad parameters") |
| return ret |
| |
| def get_delay_samples(self) -> int: |
| """ |
| Returns the algorithmic delay, as a number of samples. |
| """ |
| ret = self.lib.lc3_hr_delay_samples( |
| self.hrmode, self.frame_duration_us, self.pcm_sample_rate_hz |
| ) |
| if ret < 0: |
| raise InvalidArgumentError("Bad parameters") |
| return ret |
| |
| @classmethod |
| def _resolve_pcm_format(cls, bit_depth: int | None) -> tuple[ |
| _PcmFormat, |
| type[ctypes.c_int16] | type[ctypes.Array[ctypes.c_byte]] | type[ctypes.c_float], |
| ]: |
| match bit_depth: |
| case 16: |
| return (_PcmFormat.S16, ctypes.c_int16) |
| case 24: |
| return (_PcmFormat.S24_3LE, 3 * ctypes.c_byte) |
| case None: |
| return (_PcmFormat.FLOAT, ctypes.c_float) |
| case _: |
| raise InvalidArgumentError("Could not interpret PCM bit_depth") |
| |
| |
| class Encoder(_Base): |
| """ |
| LC3 Encoder wrapper. |
| |
| The `frame_duration_us`, in microsecond, is any of 2500, 5000, 7500, or 10000. |
| The `sample_rate_hz`, in Hertz, is any of 8000, 16000, 24000, 32000 |
| or 48000, unless High-Resolution mode is enabled. In High-Resolution mode, |
| the `sample_rate_hz` is 48000 or 96000. |
| |
| By default, one channel is processed. When `num_channels` is greater than one, |
| the PCM input stream is read interleaved and consecutives LC3 frames are |
| output, for each channel. |
| |
| Optional arguments: |
| hrmode : Enable High-Resolution mode, default is `False`. |
| input_sample_rate_hz : Input PCM samplerate, enable downsampling of input. |
| libpath : LC3 library path and name |
| """ |
| |
| class c_encoder_t(c_void_p): |
| pass |
| |
| def __init__( |
| self, |
| frame_duration_us: int, |
| sample_rate_hz: int, |
| num_channels: int = 1, |
| hrmode: bool = False, |
| input_sample_rate_hz: int | None = None, |
| libpath: str | None = None, |
| ) -> None: |
| |
| super().__init__( |
| frame_duration_us, |
| sample_rate_hz, |
| num_channels, |
| hrmode, |
| input_sample_rate_hz, |
| libpath, |
| ) |
| |
| lib = self.lib |
| |
| if not all( |
| hasattr(lib, func) |
| for func in ("lc3_hr_encoder_size", "lc3_hr_setup_encoder") |
| ): |
| if self.hrmode: |
| raise InitializationError("High-Resolution interface not available") |
| |
| lc3_hr_encoder_size = lambda hrmode, dt_us, sr_hz: lib.lc3_encoder_size( |
| dt_us, sr_hz |
| ) |
| |
| lc3_hr_setup_encoder = ( |
| lambda hrmode, dt_us, sr_hz, sr_pcm_hz, mem: lib.lc3_setup_encoder( |
| dt_us, sr_hz, sr_pcm_hz, mem |
| ) |
| ) |
| setattr(lib, "lc3_hr_encoder_size", lc3_hr_encoder_size) |
| setattr(lib, "lc3_hr_setup_encoder", lc3_hr_setup_encoder) |
| |
| lib.lc3_hr_encoder_size.argtypes = [c_bool, c_int, c_int] |
| lib.lc3_hr_encoder_size.restype = c_uint |
| |
| lib.lc3_hr_setup_encoder.argtypes = [c_bool, c_int, c_int, c_int, c_void_p] |
| lib.lc3_hr_setup_encoder.restype = self.c_encoder_t |
| |
| lib.lc3_encode.argtypes = [ |
| self.c_encoder_t, |
| c_int, |
| c_void_p, |
| c_int, |
| c_int, |
| c_void_p, |
| ] |
| |
| def new_encoder(): |
| return lib.lc3_hr_setup_encoder( |
| self.hrmode, |
| self.frame_duration_us, |
| self.sample_rate_hz, |
| self.pcm_sample_rate_hz, |
| self.malloc( |
| lib.lc3_hr_encoder_size( |
| self.hrmode, self.frame_duration_us, self.pcm_sample_rate_hz |
| ) |
| ), |
| ) |
| |
| self.__encoders = [new_encoder() for _ in range(num_channels)] |
| |
| def __del__(self) -> None: |
| |
| try: |
| (self.free(encoder) for encoder in self.__encoders) |
| finally: |
| return |
| |
| @typing.overload |
| def encode( |
| self, |
| pcm: bytes | bytearray | memoryview | Iterable[float], |
| num_bytes: int, |
| bit_depth: None = None, |
| ) -> bytes: ... |
| |
| @typing.overload |
| def encode( |
| self, pcm: bytes | bytearray | memoryview, num_bytes: int, bit_depth: int |
| ) -> bytes: ... |
| |
| def encode(self, pcm, num_bytes: int, bit_depth: int | None = None) -> bytes: |
| """ |
| Encodes LC3 frame(s), for each channel. |
| |
| The `pcm` input is given in two ways. When no `bit_depth` is defined, |
| it's a vector of floating point values from -1 to 1, coding the sample |
| levels. When `bit_depth` is defined, `pcm` is interpreted as a byte-like |
| object, each sample coded on `bit_depth` bits (16 or 24). |
| The machine endianness, or little endian, is used for 16 or 24 bits |
| width, respectively. |
| In both cases, the `pcm` vector data is padded with zeros when |
| its length is less than the required input samples for the encoder. |
| Channels concatenation of encoded LC3 frames, of `nbytes`, is returned. |
| """ |
| |
| nchannels = self.num_channels |
| frame_samples = self.get_frame_samples() |
| |
| (pcm_fmt, pcm_t) = self._resolve_pcm_format(bit_depth) |
| pcm_len = nchannels * frame_samples |
| |
| if bit_depth is None: |
| pcm_buffer = array.array("f", pcm) |
| |
| # Invert test to catch NaN |
| if not abs(sum(pcm_buffer)) / frame_samples < 2: |
| raise InvalidArgumentError("Out of range PCM input") |
| |
| padding = max(pcm_len - frame_samples, 0) |
| pcm_buffer.extend(array.array("f", [0] * padding)) |
| |
| else: |
| padding = max(pcm_len * ctypes.sizeof(pcm_t) - len(pcm), 0) |
| pcm_buffer = bytearray(pcm) + bytearray(padding) # type: ignore |
| |
| data_buffer = (c_byte * num_bytes)() |
| data_offset = 0 |
| |
| for ich, encoder in enumerate(self.__encoders): |
| |
| pcm_offset = ich * ctypes.sizeof(pcm_t) |
| pcm = (pcm_t * (pcm_len - ich)).from_buffer(pcm_buffer, pcm_offset) |
| |
| data_size = num_bytes // nchannels + int(ich < num_bytes % nchannels) |
| data = (c_byte * data_size).from_buffer(data_buffer, data_offset) |
| data_offset += data_size |
| |
| ret = self.lib.lc3_encode(encoder, pcm_fmt, pcm, nchannels, len(data), data) |
| if ret < 0: |
| raise InvalidArgumentError("Bad parameters") |
| |
| return bytes(data_buffer) |
| |
| |
| class Decoder(_Base): |
| """ |
| LC3 Decoder wrapper. |
| |
| The `frame_duration_us`, in microsecond, is any of 2500, 5000, 7500, or 10000. |
| The `sample_rate_hz`, in Hertz, is any of 8000, 16000, 24000, 32000 |
| or 48000, unless High-Resolution mode is enabled. In High-Resolution mode, |
| the `sample_rate_hz` is 48000 or 96000. |
| |
| By default, one channel is processed. When `num_chanels` is greater than one, |
| the PCM input stream is read interleaved and consecutives LC3 frames are |
| output, for each channel. |
| |
| Optional arguments: |
| hrmode : Enable High-Resolution mode, default is `False`. |
| output_sample_rate_hz : Output PCM sample_rate_hz, enable upsampling of output. |
| libpath : LC3 library path and name |
| """ |
| |
| class c_decoder_t(c_void_p): |
| pass |
| |
| def __init__( |
| self, |
| frame_duration_us: int, |
| sample_rate_hz: int, |
| num_channels: int = 1, |
| hrmode: bool = False, |
| output_sample_rate_hz: int | None = None, |
| libpath: str | None = None, |
| ) -> None: |
| |
| super().__init__( |
| frame_duration_us, |
| sample_rate_hz, |
| num_channels, |
| hrmode, |
| output_sample_rate_hz, |
| libpath, |
| ) |
| |
| lib = self.lib |
| |
| if not all( |
| hasattr(lib, func) |
| for func in ("lc3_hr_decoder_size", "lc3_hr_setup_decoder") |
| ): |
| if self.hrmode: |
| raise InitializationError("High-Resolution interface not available") |
| |
| lc3_hr_decoder_size = lambda hrmode, dt_us, sr_hz: lib.lc3_decoder_size( |
| dt_us, sr_hz |
| ) |
| |
| lc3_hr_setup_decoder = ( |
| lambda hrmode, dt_us, sr_hz, sr_pcm_hz, mem: lib.lc3_setup_decoder( |
| dt_us, sr_hz, sr_pcm_hz, mem |
| ) |
| ) |
| setattr(lib, "lc3_hr_decoder_size", lc3_hr_decoder_size) |
| setattr(lib, "lc3_hr_setup_decoder", lc3_hr_setup_decoder) |
| |
| lib.lc3_hr_decoder_size.argtypes = [c_bool, c_int, c_int] |
| lib.lc3_hr_decoder_size.restype = c_uint |
| |
| lib.lc3_hr_setup_decoder.argtypes = [c_bool, c_int, c_int, c_int, c_void_p] |
| lib.lc3_hr_setup_decoder.restype = self.c_decoder_t |
| |
| lib.lc3_decode.argtypes = [ |
| self.c_decoder_t, |
| c_void_p, |
| c_int, |
| c_int, |
| c_void_p, |
| c_int, |
| ] |
| |
| def new_decoder(): |
| return lib.lc3_hr_setup_decoder( |
| self.hrmode, |
| self.frame_duration_us, |
| self.sample_rate_hz, |
| self.pcm_sample_rate_hz, |
| self.malloc( |
| lib.lc3_hr_decoder_size( |
| self.hrmode, self.frame_duration_us, self.pcm_sample_rate_hz |
| ) |
| ), |
| ) |
| |
| self.__decoders = [new_decoder() for i in range(num_channels)] |
| |
| def __del__(self) -> None: |
| |
| try: |
| (self.free(decoder) for decoder in self.__decoders) |
| finally: |
| return |
| |
| @typing.overload |
| def decode( |
| self, data: bytes | bytearray | memoryview | None, bit_depth: None = None |
| ) -> array.array[float]: ... |
| |
| @typing.overload |
| def decode(self, data: bytes | bytearray | memoryview | None, bit_depth: int) -> bytes: ... |
| |
| def decode( |
| self, data: bytes | bytearray | memoryview | None, bit_depth: int | None = None |
| ) -> bytes | array.array[float]: |
| """ |
| Decodes an LC3 frame. |
| |
| The input `data` is the channels concatenation of LC3 frames in a |
| byte-like object. Interleaved PCM samples are returned according to |
| the `bit_depth` indication. |
| Setting `data` to `None` enables PLC (Packet Loss Concealment) |
| reconstruction for the block of LC3 frames. |
| When no `bit_depth` is defined, it's a vector of floating point values |
| from -1 to 1, coding the sample levels. When `bit_depth` is defined, |
| it returns a byte array, each sample coded on `bit_depth` bits. |
| The machine endianness, or little endian, is used for 16 or 24 bits |
| width, respectively. |
| """ |
| |
| num_channels = self.num_channels |
| |
| (pcm_fmt, pcm_t) = self._resolve_pcm_format(bit_depth) |
| pcm_len = num_channels * self.get_frame_samples() |
| pcm_buffer = (pcm_t * pcm_len)() |
| |
| if data is not None: |
| data_buffer = bytearray(data) |
| data_offset = 0 |
| |
| for ich, decoder in enumerate(self.__decoders): |
| pcm_offset = ich * ctypes.sizeof(pcm_t) |
| pcm = (pcm_t * (pcm_len - ich)).from_buffer(pcm_buffer, pcm_offset) |
| |
| if data is None: |
| ret = self.lib.lc3_decode( |
| decoder, None, 0, pcm_fmt, pcm, self.num_channels |
| ) |
| else: |
| data_size = len(data_buffer) // num_channels + int( |
| ich < len(data_buffer) % num_channels |
| ) |
| buf = (c_byte * data_size).from_buffer(data_buffer, data_offset) |
| data_offset += data_size |
| ret = self.lib.lc3_decode( |
| decoder, buf, len(buf), pcm_fmt, pcm, self.num_channels |
| ) |
| |
| if ret < 0: |
| raise InvalidArgumentError("Bad parameters") |
| |
| return array.array("f", pcm_buffer) if bit_depth is None else bytes(pcm_buffer) |