三菱 CELP 方式音声コーデックの分析 (デコーダ・LSP パラメータ編)

三菱 CELP 方式音声コーデック (以下、M-CELP) は、デジタル MCA 無線や消防救急デジタル無線に用いられていることが知られる CELP 方式の音声コーデックです。このコーデックはプロプライエタリであり、消防救急デジタル無線に採用されるまではあまり知られていませんでした。消防救急デジタル無線は無線マニアによる聴取需要が大きいこともあり、5ch 上で解析を試みるコミュニティが形成され、結果的に匿名の人物による DSP エミュレータの開発に至りました。本記事では、このコーデックのデコード部におけるLSP パラメータの処理について分析します。

解析するバイナリ

本記事では、デジタル MCA 移動無線電話装置 EF-6190 に書き込まれている DSP 向けプログラム (Wup205r57) を解析するものとして話を進めます。

DSP

M-CELP は、原則として TMS320VC5416 という DSP に実装されることがわかっています。命令セットは TMS320C54x と呼ばれるものです。この逆コンパイルには Ghidra も対応していないので、解析するには逆アセンブリを読んで命令セットリファレンスと照らし合わせるほかありません。

エントリポイント

5ch 上の 549 氏によれば、エントリポイントはそれぞれ

  • 初期化: 0x292AA
  • エンコード: 0x39311
  • デコード: 0x29365

です。今回はデコード部における LSP パラメータの処理が対象なので、0x29365 から処理をたどります。

ビットストリームの解析

ビットストリームの解析部は、0x2C975 (0x29365 -> 0x2EAF4 -> 0x2D352 -> 0x2C975) にあります。0x2C975 で解析されるビットストリームの形式は以下と思われます。(549 氏の分析も参考)

LSP パラメータ(合計 20 ビット):

  • voiced_flag: 1 ビット
  • main_index: 7 ビット
  • sub_index_1: 6 ビット
  • sub_index_2: 6 ビット

サブフレーム A(合計 31 ビット – 1, 3 番目のサブフレーム):

  • pitch_delay: 8ビット
  • fixed_codebook_index: 16ビット
  • gain_code_book_index: 7ビット

サブフレーム B(合計 28 ビット – 2, 4 番目のサブフレーム):

  • pitch_delay_delta: 5 ビット
  • fixed_codebook_index: 16 ビット
  • gain_code_book_index: 7 ビット

フレーム終端(合計 6 ビット):

  • control_bit: 1 ビット
  • 未使用: 5 ビット

フレーム全体のビット数: 143 ビット

ここで注目すべきなのは、LSP パラメータが有声・無声の別で異なる処理が行われると思われることと、コードブックが主・副で分かれていると思われることです。特に後者は、Split Vector Quantization (以下、SVQ) が行われていることを示しています。

LSP のデコード

解析されたビットストリームから LSP をデコードする処理は、0x2CE12 (0x29365 -> 0x2EAF4 -> 0x2D352 -> 0x2D14B -> 0x2D071 -> 0x2CE12) で行われています。ここで、0x2CE12 を Python で実装したコードを示します。

    # func_ce12
    def _generate_lsp_from_params(self, params: LspParameters):
        """
        Generate LSP parameters from codebook indices.

        Args:
            params: LspParameters containing codebook indices and voiced flag
        """
        # Get appropriate weights based on voiced flag
        is_voiced = params.voiced_flag == 1
        base_weights, history_weights = self._get_weights(is_voiced, "generation")

        # Initialize LSP parameters
        lsp_params = np.zeros(self.ORDER, dtype=np.int16)

        # Get values from codebooks and convert to int16
        main_codebook = np.array(LSP_MAIN_CODEBOOK, dtype=np.uint16).astype(np.int16)
        sub_codebook = np.array(LSP_SUB_CODEBOOK, dtype=np.uint16).astype(np.int16)

        main_value = main_codebook[params.main_index]
        sub_value_1 = sub_codebook[params.sub_index_1]
        sub_value_2 = sub_codebook[params.sub_index_2]

        # Combine main and sub values
        lsp_params[:5] = main_value[:5] + sub_value_1[:5]
        lsp_params[5:] = main_value[5:] + sub_value_2[5:]

        # Apply spacing adjustments
        # ar0 = 10, ar2 = 0x0060
        self._enforce_minimum_lsp_separation(lsp_params, 10)
        # ar0 = 5, ar2 = 0x0060
        self._enforce_minimum_lsp_separation(lsp_params, 5)

        # Apply smoothing filter
        # ar1 = 0x7BEC, ar2 = 0x0060, ar3 = 0xF205, ar4 = 0x6C8A, ar5 = 0xF1C9
        self._current_lsp_params = self._apply_smoothing_filter(
            lsp_params, base_weights, self._lsp_history, history_weights
        )

        # Update history with new parameters
        # ar2 = 0x0060, ar3 = 0x6C8A
        self._update_history(lsp_params)

        # Stabilize and store results
        # ar2 = 0x7BEC
        self._enforce_lsp_stability_constraints(self._current_lsp_params)

SVQ のデコードは極めて簡単で、各コードブックから参照した値を前後半に分けて加算しているだけです。その後、LSP 間の最小距離を確保して安定性を高め、過去の LSP の履歴をもとに LSP の変化を緩やかにしています。最後に、最小値と最大値のクリッピングを行い、再び LSP 間の最小距離を確保して安定性を高めています。LSP の変化の平滑化では、有声・無声の別によってフィルタ重みを調整して様子もあります。

総括

商用の CELP 方式音声コーデックである M-CELP を分析し、計算複雑性の低減やデコードされた音声の品質の向上の方法がわかりました。

付録

M-CELP のデコーダの一部を Python で実装したコードを付録とします。

from typing import Literal
import numpy as np
from numpy.typing import NDArray

from .frame import Frame, LspParameters
from .lsp_constants import (
    LSP_MAIN_CODEBOOK,
    LSP_SUB_CODEBOOK,
    LSP_UNVOICED_HISTORY_WEIGHTS,
    LSP_VOICED_HISTORY_WEIGHTS,
    LSP_UNVOICED_BASE_WEIGHTS,
    LSP_VOICED_BASE_WEIGHTS,
    LSP_UNVOICED_PREDICTION_WEIGHTS,
    LSP_VOICED_PREDICTION_WEIGHTS,
    LSP_COSINE_LOOKUP_TABLE,
    LSP_SINE_LOOKUP_TABLE,
)


class Mcelp:
    # LSP configuration constants
    ORDER = 10  # Number of LSP coefficients per frame

    # LSP stability constraint constants
    MIN_LSP_VALUE = 0x29  # Minimum value for first LSP
    MAX_LSP_VALUE = 0x6452  # Maximum value for last LSP
    MIN_LSP_SPACING = 0x141  # Minimum spacing between adjacent LSPs

    # LSP frequency domain transformation constants
    LSP_SCALING = 0x517D  # Initial scaling factor
    MAX_TABLE_INDEX = 0x3F  # Maximum lookup table index (63)

    # Default LSP values for initialization
    # fmt: off
    _DEFAULT_LSP_PARAMS = np.array(
        [0x0924, 0x1247, 0x1B6B, 0x248F, 0x2DB2, 0x36D6, 0x3FF9, 0x491D, 0x5241, 0x5B64],
        dtype=np.uint16
    ).astype(np.int16)

    _DEFAULT_FREQUENCY_DOMAIN_LSP_PARAMS = np.array(
        [0x7AD1, 0x6BB0, 0x53D4, 0x352C, 0x1237, 0xEDC9, 0xCAD4, 0xAC2C, 0x9450, 0x852F],
        dtype=np.uint16
    ).astype(np.int16)
    # fmt: on

    def __init__(self) -> None:
        """Initialize MCELP with default state."""
        self._reset_state()

    def _reset_state(self) -> None:
        """Reset internal state to default values."""
        # 0x6C8A
        self._lsp_history = np.stack([self._DEFAULT_LSP_PARAMS] * 3)
        # 0x7BEC
        self._current_lsp_params = self._DEFAULT_LSP_PARAMS.copy()
        # 0x6CB2
        self._latest_lsp_params = self._DEFAULT_LSP_PARAMS.copy()
        # 0x6F34
        self._current_frequency_domain_lsp_params = (
            self._DEFAULT_FREQUENCY_DOMAIN_LSP_PARAMS.copy()
        )
        # 0x6CBC
        self._latest_frequency_domain_lsp_params = (
            self._DEFAULT_FREQUENCY_DOMAIN_LSP_PARAMS.copy()
        )
        self._voiced_flag = False

    def _get_weights(
        self, voiced_flag: bool, weight_type: Literal["generation", "prediction"]
    ) -> tuple[np.ndarray, np.ndarray]:
        """
        Get appropriate weights based on voice flag and weight type.

        Args:
            voiced_flag: True if frame is voiced, False otherwise
            weight_type: Type of weights to retrieve ("generation" or "prediction")

        Returns:
            Tuple of (base_weights, history_weights)
        """
        # Select the appropriate base weights based on weight type and voiced flag
        if weight_type == "generation":
            base_weights_source = (
                LSP_VOICED_BASE_WEIGHTS if voiced_flag else LSP_UNVOICED_BASE_WEIGHTS
            )
        else:  # prediction
            base_weights_source = (
                LSP_VOICED_PREDICTION_WEIGHTS
                if voiced_flag
                else LSP_UNVOICED_PREDICTION_WEIGHTS
            )

        # Convert base weights to int16
        base_weights = np.array(base_weights_source, dtype=np.uint16).astype(np.int16)

        # Get history weights (same for both generation and prediction)
        history_weights_source = (
            LSP_VOICED_HISTORY_WEIGHTS if voiced_flag else LSP_UNVOICED_HISTORY_WEIGHTS
        )
        history_weights = np.array(history_weights_source, dtype=np.uint16).astype(
            np.int16
        )

        return base_weights, history_weights

    # func_ca35
    def _enforce_minimum_lsp_separation(
        self,
        lsp_params: NDArray[np.int16],  # ar2
        threshold: int,  # ar0
    ) -> None:
        """
        Enforce minimum separation between adjacent LSP parameters.

        Args:
            lsp_params: Array of LSP parameters to adjust
            threshold: Minimum separation threshold
        """
        for i in range(self.ORDER - 1):
            # Calculate difference between current and next parameter
            difference = lsp_params[i] - lsp_params[i + 1]
            adjusted_difference = difference + threshold

            # If parameters are too close, adjust them equally in opposite directions
            if adjusted_difference > 0:
                half_adjustment = adjusted_difference >> 1
                lsp_params[i] -= half_adjustment
                lsp_params[i + 1] += half_adjustment

    # func_c960
    def _apply_smoothing_filter(
        self,
        current_values: NDArray[np.int16],  # ar2
        current_weights: NDArray[np.int16],  # ar3
        history_values: NDArray[np.int16],  # ar4
        history_weights: NDArray[np.int16],  # ar5
    ) -> NDArray[np.int16]:  # ar1
        """
        Apply smoothing filter to LSP parameters using weighted history.

        Args:
            current_values: Current LSP parameters
            current_weights: Weights for current parameters
            history_values: Historical LSP parameters
            history_weights: Weights for historical parameters

        Returns:
            Filtered LSP parameters
        """
        # Convert to int64 to prevent overflow during calculations
        current_values = current_values.astype(np.int64)
        current_weights = current_weights.astype(np.int64)
        history_values = history_values.astype(np.int64)
        history_weights = history_weights.astype(np.int64)

        filtered_values = np.zeros(self.ORDER, dtype=np.uint16)

        for i in range(self.ORDER):
            accumulator = current_values[i] * current_weights[i]

            for j in range(3):  # Process 3 frames of history
                accumulator += history_values[j][i] * history_weights[j][i]

            # Scale down by 2^15
            filtered_values[i] = accumulator >> 15

        return filtered_values.astype(np.int16)

    # func_c941
    def _predict_lsp_with_feedback(
        self,
        current_values: NDArray[np.int16],  # ar1
        current_weights: NDArray[np.int16],  # ar4
        history_values: NDArray[np.int16],  # ar2
        history_weights: NDArray[np.int16],  # ar3
    ) -> NDArray[np.int16]:  # ar5
        """
        Predict LSP parameters with feedback mechanism.

        Args:
            current_values: Current LSP parameters
            current_weights: Weights for current parameters
            history_values: Historical LSP parameters
            history_weights: Weights for historical parameters

        Returns:
            Predicted LSP parameters
        """
        # Convert to int64 to prevent overflow during calculations
        current_values = current_values.astype(np.int64)
        current_weights = current_weights.astype(np.int64)
        history_values = history_values.astype(np.int64)
        history_weights = history_weights.astype(np.int64)

        predicted_values = np.zeros(self.ORDER, dtype=np.uint16)
        feedback = 0

        for i in range(self.ORDER):
            base = current_values[i] << 16

            for j in range(3):  # Process 3 frames of history
                base -= (history_values[j][i] * history_weights[j][i]) << 1

            intermediate = base & 0xFFFF

            base -= feedback

            feedback += (current_weights[i] * intermediate) << 1
            feedback >>= 16
            feedback += (current_weights[i] * (base >> 16)) << 1

            predicted_values[i] = feedback >> 13

        return predicted_values.astype(np.int16)

    # func_c9ce
    def _enforce_lsp_stability_constraints(
        self,
        lsp_params: NDArray[np.int16],  # ar2
    ) -> None:
        """
        Enforce stability constraints on LSP parameters:
        1. Ensure monotonic increase
        2. Apply minimum/maximum value constraints
        3. Ensure minimum spacing

        Args:
            lsp_params: LSP parameters to stabilize
        """
        # Step 1: Single pass to ensure monotonic increase
        for i in range(len(lsp_params) - 1):
            if lsp_params[i] > lsp_params[i + 1]:
                # Swap adjacent LSPs if out of order
                lsp_params[i], lsp_params[i + 1] = lsp_params[i + 1], lsp_params[i]

        # Step 2: Apply minimum value constraint to first LSP
        lsp_params[0] = max(lsp_params[0], self.MIN_LSP_VALUE)

        # Step 3: Ensure minimum spacing between adjacent LSPs
        # Uses original values as reference to prevent cascading shifts
        for i in range(len(lsp_params) - 1):
            min_next_value = lsp_params[i] + self.MIN_LSP_SPACING
            if lsp_params[i + 1] < min_next_value:
                lsp_params[i + 1] = min_next_value

        # Step 4: Apply maximum value constraint to last LSP
        lsp_params[-1] = min(lsp_params[-1], self.MAX_LSP_VALUE)

    # func_c7ee
    def _transform_lsp_to_frequency_domain(
        self, lsp_params: NDArray[np.int16]  # ar2
    ) -> NDArray[np.int16]:  # ar3
        """
        Transform LSP parameters from line-pair domain to frequency domain.

        Args:
            lsp_params: LSP parameters to transform

        Returns:
            Transformed frequency domain parameters
        """
        # Convert lookup tables to int64 to prevent overflow
        cos_table = (
            np.array(LSP_COSINE_LOOKUP_TABLE, dtype=np.uint16)
            .astype(np.int16)
            .astype(np.int64)
        )
        sin_table = (
            np.array(LSP_SINE_LOOKUP_TABLE, dtype=np.uint16)
            .astype(np.int16)
            .astype(np.int64)
        )

        scaled_values = np.zeros(self.ORDER, dtype=np.uint16)
        lsp_params = lsp_params.astype(np.uint64)

        for i in range(self.ORDER):
            # Initial scaling
            initial_product = (lsp_params[i] * self.LSP_SCALING) << 1

            # Calculate lookup table index and interpolation fraction
            table_index = initial_product >> 8
            table_index = min(table_index, self.MAX_TABLE_INDEX << 16) >> 16
            fraction = ((initial_product >> 16) & 0xFF) << 3

            # Linear interpolation between lookup table values
            base_value = cos_table[table_index] << 16
            scale_factor = sin_table[table_index]
            interpolation = (scale_factor * fraction).astype(np.int64) << 1

            scaled_values[i] = (base_value + interpolation).astype(np.int64) >> 16

        return scaled_values.astype(np.int16)

    # func_c9c2
    def _update_history(
        self,
        lsp_params: NDArray[np.int16],  # ar2
    ) -> None:
        """
        Update LSP parameter history by shifting and inserting new parameters.

        Args:
            lsp_params: New LSP parameters to add to history
        """
        # Shift history array and insert new parameters at index 0
        # self._lsp_history: ar3
        self._lsp_history = np.roll(self._lsp_history, 1, axis=0)
        self._lsp_history[0] = lsp_params.copy()

    # func_ce12
    def _generate_lsp_from_params(self, params: LspParameters):
        """
        Generate LSP parameters from codebook indices.

        Args:
            params: LspParameters containing codebook indices and voiced flag
        """
        # Get appropriate weights based on voiced flag
        is_voiced = params.voiced_flag == 1
        base_weights, history_weights = self._get_weights(is_voiced, "generation")

        # Initialize LSP parameters
        lsp_params = np.zeros(self.ORDER, dtype=np.int16)

        # Get values from codebooks and convert to int16
        main_codebook = np.array(LSP_MAIN_CODEBOOK, dtype=np.uint16).astype(np.int16)
        sub_codebook = np.array(LSP_SUB_CODEBOOK, dtype=np.uint16).astype(np.int16)

        main_value = main_codebook[params.main_index]
        sub_value_1 = sub_codebook[params.sub_index_1]
        sub_value_2 = sub_codebook[params.sub_index_2]

        # Combine main and sub values
        lsp_params[:5] = main_value[:5] + sub_value_1[:5]
        lsp_params[5:] = main_value[5:] + sub_value_2[5:]

        # Apply spacing adjustments
        # ar0 = 10, ar2 = 0x0060
        self._enforce_minimum_lsp_separation(lsp_params, 10)
        # ar0 = 5, ar2 = 0x0060
        self._enforce_minimum_lsp_separation(lsp_params, 5)

        # Apply smoothing filter
        # ar1 = 0x7BEC, ar2 = 0x0060, ar3 = 0xF205, ar4 = 0x6C8A, ar5 = 0xF1C9
        self._current_lsp_params = self._apply_smoothing_filter(
            lsp_params, base_weights, self._lsp_history, history_weights
        )

        # Update history with new parameters
        # ar2 = 0x0060, ar3 = 0x6C8A
        self._update_history(lsp_params)

        # Stabilize and store results
        # ar2 = 0x7BEC
        self._enforce_lsp_stability_constraints(self._current_lsp_params)

    # func_d071 Part 1
    def _generate_lsp_from_codebook(self, params: LspParameters) -> None:
        """
        Generate new LSP parameters from codebook indices.

        Args:
            params: LspParameters containing codebook indices and voiced flag
        """
        # ar1 = 0x7BEC, ar7 = 0x6C8A
        self._generate_lsp_from_params(params)
        # ar3(0x7BEC) -> ar2(0x6CB2)
        self._latest_lsp_params = self._current_lsp_params.copy()
        self._voiced_flag = params.voiced_flag == 1

    # func_d071 Part 2
    def _predict_lsp_parameters(self) -> None:
        """
        Predict LSP parameters based on history and current state.
        """
        # Get appropriate weights based on voiced flag
        base_weights, history_weights = self._get_weights(
            self._voiced_flag, "prediction"
        )

        # Copy latest parameters to current
        # ar2(0x6CB2) -> ar3(0x7BEC)
        self._current_lsp_params = self._latest_lsp_params.copy()

        # Calculate predicted parameters
        # ar1 = 0x6CB2, ar2 = 0x6C8A, ar3 = 0xF1E7, ar4 = 0xF223, ar5 = 0x7BE2
        predicted_lsp_params = self._predict_lsp_with_feedback(
            self._latest_lsp_params,
            base_weights,
            self._lsp_history,
            history_weights,
        )

        # Update history with predicted parameters
        # ar2 = 0x7BE2, ar3 = 0x6C8A
        self._update_history(predicted_lsp_params)

    # func_d071
    def process_frame(self, frame: Frame) -> np.ndarray:
        """
        Process a frame of speech data to generate or predict LSP parameters.

        Args:
            frame: Frame containing control bit and LSP parameters

        Returns:
            Frequency domain LSP parameters
        """
        if frame.control_bit == 0:
            # Generate new LSP from codebook
            self._generate_lsp_from_codebook(frame.lsp_params)
        else:
            # Predict LSP based on history
            self._predict_lsp_parameters()

        # Transform to frequency domain
        # ar2 = 0x7BEC, ar3 = 0x6F34
        self._current_frequency_domain_lsp_params = (
            self._transform_lsp_to_frequency_domain(self._latest_lsp_params)
        )

        return self._current_frequency_domain_lsp_params

コメント

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です