三菱 CELP 方式音声コーデック (以下、M-CELP) は、デジタル MCA 無線や消防救急デジタル無線に用いられていることが知られる CELP 方式の音声コーデックです。このコーデックはプロプライエタリであり、消防救急デジタル無線に採用されるまではあまり知られていませんでした。消防救急デジタル無線は無線マニアによる聴取需要が大きいこともあり、5ch 上で解析を試みるコミュニティが形成され、結果的に匿名の人物による DSP エミュレータの開発に至りました。本記事では、このコーデックのデコード部におけるLSP パラメータの処理について分析します。
解析するバイナリ
本記事では、デジタル MCA 移動無線電話装置 EF-6190 に書き込まれている DSP 向けプログラム (Wup205r57) を解析するものとして話を進めます。
DSP
M-CELP は、原則として TMS320VC5416 という DSP に実装されることがわかっています。命令セットは TMS320C54x と呼ばれるものです。この逆コンパイルには Ghidra も対応していないので、解析するには逆アセンブリを読んで命令セットリファレンスと照らし合わせるほかありません。
エントリポイント
5ch 上の 549 氏によれば、エントリポイントはそれぞれ
- 初期化: 0x292AA
- エンコード: 0x39311
- デコード: 0x29365
です。今回はデコード部における LSP パラメータの処理が対象なので、0x29365 から処理をたどります。
ビットストリームの解析
ビットストリームの解析部は、0x2C975 (0x29365 -> 0x2EAF4 -> 0x2D352 -> 0x2C975) にあります。0x2C975 で解析されるビットストリームの形式は以下と思われます。(549 氏の分析も参考)
LSP パラメータ(合計 20 ビット):
- voiced_flag: 1 ビット
- main_index: 7 ビット
- sub_index_1: 6 ビット
- sub_index_2: 6 ビット
サブフレーム A(合計 31 ビット – 1, 3 番目のサブフレーム):
- pitch_delay: 8ビット
- fixed_codebook_index: 16ビット
- gain_code_book_index: 7ビット
サブフレーム B(合計 28 ビット – 2, 4 番目のサブフレーム):
- pitch_delay_delta: 5 ビット
- fixed_codebook_index: 16 ビット
- gain_code_book_index: 7 ビット
フレーム終端(合計 6 ビット):
- control_bit: 1 ビット
- 未使用: 5 ビット
フレーム全体のビット数: 143 ビット
ここで注目すべきなのは、LSP パラメータが有声・無声の別で異なる処理が行われると思われることと、コードブックが主・副で分かれていると思われることです。特に後者は、Split Vector Quantization (以下、SVQ) が行われていることを示しています。
LSP のデコード
解析されたビットストリームから LSP をデコードする処理は、0x2CE12 (0x29365 -> 0x2EAF4 -> 0x2D352 -> 0x2D14B -> 0x2D071 -> 0x2CE12) で行われています。ここで、0x2CE12 を Python で実装したコードを示します。
# func_ce12
def _generate_lsp_from_params(self, params: LspParameters):
"""
Generate LSP parameters from codebook indices.
Args:
params: LspParameters containing codebook indices and voiced flag
"""
# Get appropriate weights based on voiced flag
is_voiced = params.voiced_flag == 1
base_weights, history_weights = self._get_weights(is_voiced, "generation")
# Initialize LSP parameters
lsp_params = np.zeros(self.ORDER, dtype=np.int16)
# Get values from codebooks and convert to int16
main_codebook = np.array(LSP_MAIN_CODEBOOK, dtype=np.uint16).astype(np.int16)
sub_codebook = np.array(LSP_SUB_CODEBOOK, dtype=np.uint16).astype(np.int16)
main_value = main_codebook[params.main_index]
sub_value_1 = sub_codebook[params.sub_index_1]
sub_value_2 = sub_codebook[params.sub_index_2]
# Combine main and sub values
lsp_params[:5] = main_value[:5] + sub_value_1[:5]
lsp_params[5:] = main_value[5:] + sub_value_2[5:]
# Apply spacing adjustments
# ar0 = 10, ar2 = 0x0060
self._enforce_minimum_lsp_separation(lsp_params, 10)
# ar0 = 5, ar2 = 0x0060
self._enforce_minimum_lsp_separation(lsp_params, 5)
# Apply smoothing filter
# ar1 = 0x7BEC, ar2 = 0x0060, ar3 = 0xF205, ar4 = 0x6C8A, ar5 = 0xF1C9
self._current_lsp_params = self._apply_smoothing_filter(
lsp_params, base_weights, self._lsp_history, history_weights
)
# Update history with new parameters
# ar2 = 0x0060, ar3 = 0x6C8A
self._update_history(lsp_params)
# Stabilize and store results
# ar2 = 0x7BEC
self._enforce_lsp_stability_constraints(self._current_lsp_params)
SVQ のデコードは極めて簡単で、各コードブックから参照した値を前後半に分けて加算しているだけです。その後、LSP 間の最小距離を確保して安定性を高め、過去の LSP の履歴をもとに LSP の変化を緩やかにしています。最後に、最小値と最大値のクリッピングを行い、再び LSP 間の最小距離を確保して安定性を高めています。LSP の変化の平滑化では、有声・無声の別によってフィルタ重みを調整して様子もあります。
総括
商用の CELP 方式音声コーデックである M-CELP を分析し、計算複雑性の低減やデコードされた音声の品質の向上の方法がわかりました。
付録
M-CELP のデコーダの一部を Python で実装したコードを付録とします。
from typing import Literal
import numpy as np
from numpy.typing import NDArray
from .frame import Frame, LspParameters
from .lsp_constants import (
LSP_MAIN_CODEBOOK,
LSP_SUB_CODEBOOK,
LSP_UNVOICED_HISTORY_WEIGHTS,
LSP_VOICED_HISTORY_WEIGHTS,
LSP_UNVOICED_BASE_WEIGHTS,
LSP_VOICED_BASE_WEIGHTS,
LSP_UNVOICED_PREDICTION_WEIGHTS,
LSP_VOICED_PREDICTION_WEIGHTS,
LSP_COSINE_LOOKUP_TABLE,
LSP_SINE_LOOKUP_TABLE,
)
class Mcelp:
# LSP configuration constants
ORDER = 10 # Number of LSP coefficients per frame
# LSP stability constraint constants
MIN_LSP_VALUE = 0x29 # Minimum value for first LSP
MAX_LSP_VALUE = 0x6452 # Maximum value for last LSP
MIN_LSP_SPACING = 0x141 # Minimum spacing between adjacent LSPs
# LSP frequency domain transformation constants
LSP_SCALING = 0x517D # Initial scaling factor
MAX_TABLE_INDEX = 0x3F # Maximum lookup table index (63)
# Default LSP values for initialization
# fmt: off
_DEFAULT_LSP_PARAMS = np.array(
[0x0924, 0x1247, 0x1B6B, 0x248F, 0x2DB2, 0x36D6, 0x3FF9, 0x491D, 0x5241, 0x5B64],
dtype=np.uint16
).astype(np.int16)
_DEFAULT_FREQUENCY_DOMAIN_LSP_PARAMS = np.array(
[0x7AD1, 0x6BB0, 0x53D4, 0x352C, 0x1237, 0xEDC9, 0xCAD4, 0xAC2C, 0x9450, 0x852F],
dtype=np.uint16
).astype(np.int16)
# fmt: on
def __init__(self) -> None:
"""Initialize MCELP with default state."""
self._reset_state()
def _reset_state(self) -> None:
"""Reset internal state to default values."""
# 0x6C8A
self._lsp_history = np.stack([self._DEFAULT_LSP_PARAMS] * 3)
# 0x7BEC
self._current_lsp_params = self._DEFAULT_LSP_PARAMS.copy()
# 0x6CB2
self._latest_lsp_params = self._DEFAULT_LSP_PARAMS.copy()
# 0x6F34
self._current_frequency_domain_lsp_params = (
self._DEFAULT_FREQUENCY_DOMAIN_LSP_PARAMS.copy()
)
# 0x6CBC
self._latest_frequency_domain_lsp_params = (
self._DEFAULT_FREQUENCY_DOMAIN_LSP_PARAMS.copy()
)
self._voiced_flag = False
def _get_weights(
self, voiced_flag: bool, weight_type: Literal["generation", "prediction"]
) -> tuple[np.ndarray, np.ndarray]:
"""
Get appropriate weights based on voice flag and weight type.
Args:
voiced_flag: True if frame is voiced, False otherwise
weight_type: Type of weights to retrieve ("generation" or "prediction")
Returns:
Tuple of (base_weights, history_weights)
"""
# Select the appropriate base weights based on weight type and voiced flag
if weight_type == "generation":
base_weights_source = (
LSP_VOICED_BASE_WEIGHTS if voiced_flag else LSP_UNVOICED_BASE_WEIGHTS
)
else: # prediction
base_weights_source = (
LSP_VOICED_PREDICTION_WEIGHTS
if voiced_flag
else LSP_UNVOICED_PREDICTION_WEIGHTS
)
# Convert base weights to int16
base_weights = np.array(base_weights_source, dtype=np.uint16).astype(np.int16)
# Get history weights (same for both generation and prediction)
history_weights_source = (
LSP_VOICED_HISTORY_WEIGHTS if voiced_flag else LSP_UNVOICED_HISTORY_WEIGHTS
)
history_weights = np.array(history_weights_source, dtype=np.uint16).astype(
np.int16
)
return base_weights, history_weights
# func_ca35
def _enforce_minimum_lsp_separation(
self,
lsp_params: NDArray[np.int16], # ar2
threshold: int, # ar0
) -> None:
"""
Enforce minimum separation between adjacent LSP parameters.
Args:
lsp_params: Array of LSP parameters to adjust
threshold: Minimum separation threshold
"""
for i in range(self.ORDER - 1):
# Calculate difference between current and next parameter
difference = lsp_params[i] - lsp_params[i + 1]
adjusted_difference = difference + threshold
# If parameters are too close, adjust them equally in opposite directions
if adjusted_difference > 0:
half_adjustment = adjusted_difference >> 1
lsp_params[i] -= half_adjustment
lsp_params[i + 1] += half_adjustment
# func_c960
def _apply_smoothing_filter(
self,
current_values: NDArray[np.int16], # ar2
current_weights: NDArray[np.int16], # ar3
history_values: NDArray[np.int16], # ar4
history_weights: NDArray[np.int16], # ar5
) -> NDArray[np.int16]: # ar1
"""
Apply smoothing filter to LSP parameters using weighted history.
Args:
current_values: Current LSP parameters
current_weights: Weights for current parameters
history_values: Historical LSP parameters
history_weights: Weights for historical parameters
Returns:
Filtered LSP parameters
"""
# Convert to int64 to prevent overflow during calculations
current_values = current_values.astype(np.int64)
current_weights = current_weights.astype(np.int64)
history_values = history_values.astype(np.int64)
history_weights = history_weights.astype(np.int64)
filtered_values = np.zeros(self.ORDER, dtype=np.uint16)
for i in range(self.ORDER):
accumulator = current_values[i] * current_weights[i]
for j in range(3): # Process 3 frames of history
accumulator += history_values[j][i] * history_weights[j][i]
# Scale down by 2^15
filtered_values[i] = accumulator >> 15
return filtered_values.astype(np.int16)
# func_c941
def _predict_lsp_with_feedback(
self,
current_values: NDArray[np.int16], # ar1
current_weights: NDArray[np.int16], # ar4
history_values: NDArray[np.int16], # ar2
history_weights: NDArray[np.int16], # ar3
) -> NDArray[np.int16]: # ar5
"""
Predict LSP parameters with feedback mechanism.
Args:
current_values: Current LSP parameters
current_weights: Weights for current parameters
history_values: Historical LSP parameters
history_weights: Weights for historical parameters
Returns:
Predicted LSP parameters
"""
# Convert to int64 to prevent overflow during calculations
current_values = current_values.astype(np.int64)
current_weights = current_weights.astype(np.int64)
history_values = history_values.astype(np.int64)
history_weights = history_weights.astype(np.int64)
predicted_values = np.zeros(self.ORDER, dtype=np.uint16)
feedback = 0
for i in range(self.ORDER):
base = current_values[i] << 16
for j in range(3): # Process 3 frames of history
base -= (history_values[j][i] * history_weights[j][i]) << 1
intermediate = base & 0xFFFF
base -= feedback
feedback += (current_weights[i] * intermediate) << 1
feedback >>= 16
feedback += (current_weights[i] * (base >> 16)) << 1
predicted_values[i] = feedback >> 13
return predicted_values.astype(np.int16)
# func_c9ce
def _enforce_lsp_stability_constraints(
self,
lsp_params: NDArray[np.int16], # ar2
) -> None:
"""
Enforce stability constraints on LSP parameters:
1. Ensure monotonic increase
2. Apply minimum/maximum value constraints
3. Ensure minimum spacing
Args:
lsp_params: LSP parameters to stabilize
"""
# Step 1: Single pass to ensure monotonic increase
for i in range(len(lsp_params) - 1):
if lsp_params[i] > lsp_params[i + 1]:
# Swap adjacent LSPs if out of order
lsp_params[i], lsp_params[i + 1] = lsp_params[i + 1], lsp_params[i]
# Step 2: Apply minimum value constraint to first LSP
lsp_params[0] = max(lsp_params[0], self.MIN_LSP_VALUE)
# Step 3: Ensure minimum spacing between adjacent LSPs
# Uses original values as reference to prevent cascading shifts
for i in range(len(lsp_params) - 1):
min_next_value = lsp_params[i] + self.MIN_LSP_SPACING
if lsp_params[i + 1] < min_next_value:
lsp_params[i + 1] = min_next_value
# Step 4: Apply maximum value constraint to last LSP
lsp_params[-1] = min(lsp_params[-1], self.MAX_LSP_VALUE)
# func_c7ee
def _transform_lsp_to_frequency_domain(
self, lsp_params: NDArray[np.int16] # ar2
) -> NDArray[np.int16]: # ar3
"""
Transform LSP parameters from line-pair domain to frequency domain.
Args:
lsp_params: LSP parameters to transform
Returns:
Transformed frequency domain parameters
"""
# Convert lookup tables to int64 to prevent overflow
cos_table = (
np.array(LSP_COSINE_LOOKUP_TABLE, dtype=np.uint16)
.astype(np.int16)
.astype(np.int64)
)
sin_table = (
np.array(LSP_SINE_LOOKUP_TABLE, dtype=np.uint16)
.astype(np.int16)
.astype(np.int64)
)
scaled_values = np.zeros(self.ORDER, dtype=np.uint16)
lsp_params = lsp_params.astype(np.uint64)
for i in range(self.ORDER):
# Initial scaling
initial_product = (lsp_params[i] * self.LSP_SCALING) << 1
# Calculate lookup table index and interpolation fraction
table_index = initial_product >> 8
table_index = min(table_index, self.MAX_TABLE_INDEX << 16) >> 16
fraction = ((initial_product >> 16) & 0xFF) << 3
# Linear interpolation between lookup table values
base_value = cos_table[table_index] << 16
scale_factor = sin_table[table_index]
interpolation = (scale_factor * fraction).astype(np.int64) << 1
scaled_values[i] = (base_value + interpolation).astype(np.int64) >> 16
return scaled_values.astype(np.int16)
# func_c9c2
def _update_history(
self,
lsp_params: NDArray[np.int16], # ar2
) -> None:
"""
Update LSP parameter history by shifting and inserting new parameters.
Args:
lsp_params: New LSP parameters to add to history
"""
# Shift history array and insert new parameters at index 0
# self._lsp_history: ar3
self._lsp_history = np.roll(self._lsp_history, 1, axis=0)
self._lsp_history[0] = lsp_params.copy()
# func_ce12
def _generate_lsp_from_params(self, params: LspParameters):
"""
Generate LSP parameters from codebook indices.
Args:
params: LspParameters containing codebook indices and voiced flag
"""
# Get appropriate weights based on voiced flag
is_voiced = params.voiced_flag == 1
base_weights, history_weights = self._get_weights(is_voiced, "generation")
# Initialize LSP parameters
lsp_params = np.zeros(self.ORDER, dtype=np.int16)
# Get values from codebooks and convert to int16
main_codebook = np.array(LSP_MAIN_CODEBOOK, dtype=np.uint16).astype(np.int16)
sub_codebook = np.array(LSP_SUB_CODEBOOK, dtype=np.uint16).astype(np.int16)
main_value = main_codebook[params.main_index]
sub_value_1 = sub_codebook[params.sub_index_1]
sub_value_2 = sub_codebook[params.sub_index_2]
# Combine main and sub values
lsp_params[:5] = main_value[:5] + sub_value_1[:5]
lsp_params[5:] = main_value[5:] + sub_value_2[5:]
# Apply spacing adjustments
# ar0 = 10, ar2 = 0x0060
self._enforce_minimum_lsp_separation(lsp_params, 10)
# ar0 = 5, ar2 = 0x0060
self._enforce_minimum_lsp_separation(lsp_params, 5)
# Apply smoothing filter
# ar1 = 0x7BEC, ar2 = 0x0060, ar3 = 0xF205, ar4 = 0x6C8A, ar5 = 0xF1C9
self._current_lsp_params = self._apply_smoothing_filter(
lsp_params, base_weights, self._lsp_history, history_weights
)
# Update history with new parameters
# ar2 = 0x0060, ar3 = 0x6C8A
self._update_history(lsp_params)
# Stabilize and store results
# ar2 = 0x7BEC
self._enforce_lsp_stability_constraints(self._current_lsp_params)
# func_d071 Part 1
def _generate_lsp_from_codebook(self, params: LspParameters) -> None:
"""
Generate new LSP parameters from codebook indices.
Args:
params: LspParameters containing codebook indices and voiced flag
"""
# ar1 = 0x7BEC, ar7 = 0x6C8A
self._generate_lsp_from_params(params)
# ar3(0x7BEC) -> ar2(0x6CB2)
self._latest_lsp_params = self._current_lsp_params.copy()
self._voiced_flag = params.voiced_flag == 1
# func_d071 Part 2
def _predict_lsp_parameters(self) -> None:
"""
Predict LSP parameters based on history and current state.
"""
# Get appropriate weights based on voiced flag
base_weights, history_weights = self._get_weights(
self._voiced_flag, "prediction"
)
# Copy latest parameters to current
# ar2(0x6CB2) -> ar3(0x7BEC)
self._current_lsp_params = self._latest_lsp_params.copy()
# Calculate predicted parameters
# ar1 = 0x6CB2, ar2 = 0x6C8A, ar3 = 0xF1E7, ar4 = 0xF223, ar5 = 0x7BE2
predicted_lsp_params = self._predict_lsp_with_feedback(
self._latest_lsp_params,
base_weights,
self._lsp_history,
history_weights,
)
# Update history with predicted parameters
# ar2 = 0x7BE2, ar3 = 0x6C8A
self._update_history(predicted_lsp_params)
# func_d071
def process_frame(self, frame: Frame) -> np.ndarray:
"""
Process a frame of speech data to generate or predict LSP parameters.
Args:
frame: Frame containing control bit and LSP parameters
Returns:
Frequency domain LSP parameters
"""
if frame.control_bit == 0:
# Generate new LSP from codebook
self._generate_lsp_from_codebook(frame.lsp_params)
else:
# Predict LSP based on history
self._predict_lsp_parameters()
# Transform to frequency domain
# ar2 = 0x7BEC, ar3 = 0x6F34
self._current_frequency_domain_lsp_params = (
self._transform_lsp_to_frequency_domain(self._latest_lsp_params)
)
return self._current_frequency_domain_lsp_params