转载本文请注明出处:https://yudonglee.me/ctc-explained-part3  作者:yudonglee

本系列文章总共分为三部分来全面阐述CTC算法(本篇为Part 3):
Part 1:Training the Network(训练算法篇),介绍CTC理论原理,包括问题定义、公式推导、算法过程等。Part 1链接
Part 2:Decoding the Network(解码算法篇),介绍CTC Decoding的几种常用算法。Part 2链接

Part 3:CTC Demo by Speech Recognition(语音识别实战篇),基于 TensorFlow 实现完整的 CTC 语音识别系统,即本篇

在前两篇文章中,我们分别从理论层面深入剖析了 CTC 的训练算法和解码算法。理论固然重要,但”纸上得来终觉浅,绝知此事要躬行”。本篇将带领读者从零开始,基于 TensorFlow 2.x 搭建一个完整的端到端语音识别系统,将 Part 1 和 Part 2 中的理论知识落地为可运行的代码。

本篇的内容组织如下:

  1. 开发环境搭建
  2. 数据集介绍与准备
  3. 音频特征提取(MFCC)
  4. 数据预处理 Pipeline
  5. 模型架构设计(BiLSTM + CTC)
  6. CTC Loss 函数与训练过程
  7. CTC Beam Search 解码实现
  8. 模型评估与指标计算(CER / WER)
  9. 完整训练流程与实验结果
  10. 常见问题与调优建议
  11. 总结与展望

接下来,让我们一步步开始搭建。

1. 开发环境搭建

1.1 环境要求

本实战项目基于以下环境:

  • Python 3.8+
  • TensorFlow 2.10+(包含 `tf.nn.ctc_loss` 等 CTC 相关 API)
  • NumPy 1.21+
  • Librosa 0.9+(音频处理与特征提取)
  • SoundFile 0.10+(音频文件读取)
  • Matplotlib 3.5+(可视化)
  • tqdm(训练进度显示)

1.2 安装依赖

# 创建虚拟环境(推荐)
conda create -n ctc-asr python=3.8 -y
conda activate ctc-asr

# 安装核心依赖(TF 2.10+ 已统一 CPU/GPU 包,无需单独安装 tensorflow-gpu)
pip install tensorflow==2.10.0
pip install numpy librosa soundfile matplotlib tqdm

1.3 项目结构

ctc-speech-recognition/
├── data/
│   ├── raw/                  # 原始音频和标注文件
│   ├── processed/            # 预处理后的特征文件
│   └── vocab.json            # 字符集词表
├── src/
│   ├── config.py             # 超参数配置
│   ├── build_vocab.py        # 词表构建
│   ├── feature_extraction.py # 音频特征提取
│   ├── data_pipeline.py      # 数据加载与预处理
│   ├── model.py              # 模型定义
│   ├── train.py              # 训练脚本
│   ├── decode.py             # 解码模块
│   └── evaluate.py           # 评估脚本
├── notebooks/
│   └── visualization.ipynb   # 可视化分析
├── checkpoints/              # 模型检查点
├── logs/                     # TensorBoard 日志
├── main.py                   # 主入口
└── README.md

我们先从配置文件开始,定义项目中用到的所有超参数:

# src/config.py

"""
CTC 语音识别系统 - 超参数配置
"""

class Config:
    # ============ 音频特征参数 ============
    sample_rate = 16000          # 采样率(Hz)
    n_mfcc = 39                  # MFCC 特征维度(13维MFCC + 13维一阶差分 + 13维二阶差分)
    n_fft = 512                  # FFT 窗口大小
    hop_length = 160             # 帧移(10ms = 16000 * 0.01)
    win_length = 400             # 帧长(25ms = 16000 * 0.025)
    n_mels = 80                  # Mel 滤波器组数量

    # ============ 模型参数 ============
    rnn_units = 256              # LSTM 隐藏层单元数
    num_rnn_layers = 3           # LSTM 层数
    dropout_rate = 0.3           # Dropout 比例
    use_bidirectional = True     # 是否使用双向 LSTM

    # ============ 训练参数 ============
    batch_size = 32              # 批大小
    epochs = 100                 # 最大训练轮数
    learning_rate = 1e-3         # 初始学习率
    lr_decay_factor = 0.5       # 学习率衰减因子
    lr_decay_patience = 5        # 学习率衰减耐心值
    max_audio_length = 16.0      # 最大音频长度(秒)
    clip_grad_norm = 5.0         # 梯度裁剪阈值

    # ============ 解码参数 ============
    beam_width = 25              # Beam Search 束宽
    lm_weight = 0.0              # 语言模型权重(本篇暂不使用外部LM)
    length_bonus = 0.0           # 长度奖励系数

    # ============ 路径配置 ============
    data_dir = "data/"
    checkpoint_dir = "checkpoints/"
    log_dir = "logs/"

    # ============ 字符集参数 ============
    # blank 放在最后一个索引(index 27),这是 TensorFlow ctc_loss 的默认约定
    vocab_size = 28              # 26字母 + 1空格 + 1blank
    blank_index = 27             # blank 在字符集中的索引

这里有几个参数值得特别说明:

  • n_mfcc = 39:这是语音识别中最经典的特征维度配置。13 维 MFCC 基础系数 + 13 维一阶差分(delta)+ 13 维二阶差分(delta-delta),共 39 维。一阶差分捕捉特征的变化趋势,二阶差分捕捉变化的加速度,这些动态特征对识别准确率至关重要。
  • hop_length = 160:对应 10ms 的帧移(16000 Hz × 0.01s = 160 个采样点),这是语音识别中的标准设定。
  • clip_grad_norm = 5.0:CTC 训练初期梯度容易爆炸,梯度裁剪是必不可少的稳定手段。
  • blank_index = 27:将 blank 放在字符集的最后一个索引位置。这是 TensorFlow `tf.nn.ctc_loss` 的默认行为,避免了需要额外指定 `blank_index` 参数带来的兼容性问题。

2. 数据集介绍与准备

2.1 数据集选择

本实战选用 LibriSpeech 数据集作为示例,同时简要介绍 TIMIT 数据集供读者参考。

TIMIT(Texas Instruments / MIT):

  • 经典的英文语音识别基准数据集
  • 包含 6300 条语音,共约 5.4 小时
  • 630 位说话人,每人 10 句
  • 提供音素(phoneme)级别的时间对齐标注
  • 数据量小,适合快速实验和原型验证

LibriSpeech

  • 大规模英文有声书语音数据集
  • train-clean-100:约 100 小时干净语音
  • train-clean-360:约 360 小时干净语音
  • 提供文本转录,不提供时间对齐
  • 数据量大,适合训练更好的模型

本篇以 LibriSpeech train-clean-100 为主要示例,因为它无需时间对齐标注,与 CTC 的端到端训练范式完美契合。

2.2 下载数据集

# 下载 LibriSpeech train-clean-100(约 6.3GB)
wget https://www.openslr.org/resources/12/train-clean-100.tar.gz
tar -xzf train-clean-100.tar.gz -C data/raw/

# 下载测试集 test-clean(约 346MB)
wget https://www.openslr.org/resources/12/test-clean.tar.gz
tar -xzf test-clean.tar.gz -C data/raw/

2.3 数据集结构解析

LibriSpeech 的目录结构如下:

data/raw/LibriSpeech/
├── train-clean-100/
│   ├── 19/                    # 说话人 ID
│   │   ├── 198/               # 章节 ID
│   │   │   ├── 19-198.trans.txt       # 该章节的文本转录
│   │   │   ├── 19-198-0000.flac       # 音频文件 (FLAC格式)
│   │   │   ├── 19-198-0001.flac
│   │   │   └── ...
│   │   └── ...
│   └── ...
└── test-clean/
    └── ...

其中 `.trans.txt` 文件的格式为:

19-198-0000 NORTHANGER ABBEY
19-198-0001 THIS LITTLE WORK WAS FINISHED IN THE YEAR ...

每行的格式是 `{说话人ID}-{章节ID}-{句子ID} {转录文本}`。

2.4 构建字符集词表

CTC 模型的输出单元是字符级别的。我们需要构建一个字符集词表(vocabulary),将每个字符映射为一个整数索引。

# src/build_vocab.py

"""
构建字符集词表

CTC 的输出字符集 = 26个英文字母 + 空格 + blank
其中 blank 是 CTC 算法引入的特殊符号(详见 Part 1 第4节),
用于表示"当前时间步无输出",在映射函数 B 中会被移除。
"""

import json
import os
from src.config import Config


def build_vocabulary():
    """
    构建字符到索引的映射表。

    索引分配规则:
    - index 0:     空格 '<space>'
    - index 1-26:  字母 a-z
    - index 27:    CTC blank 符号

    重要:blank 放在最后一个索引(27),这是 TensorFlow tf.nn.ctc_loss
    的默认约定。如果 blank 放在 index 0,则需要显式传入 blank_index=0,
    且不同 TF 版本的兼容性可能存在问题。将 blank 放在最后可以避免这类隐患。
    """
    vocab = {
        '<space>': 0,   # 空格
    }

    # 26个英文小写字母,index 1-26
    for i, char in enumerate('abcdefghijklmnopqrstuvwxyz'):
        vocab[char] = i + 1

    # blank 放在最后,index 27
    vocab['<blank>'] = Config.blank_index  # 27

    # 反向映射:索引 -> 字符
    idx_to_char = {}
    for k, v in vocab.items():
        if k == '<space>':
            idx_to_char[v] = ' '
        elif k == '<blank>':
            idx_to_char[v] = ''      # blank 映射为空字符串(解码时跳过)
        else:
            idx_to_char[v] = k

    print(f"字符集大小: {len(vocab)}")
    print(f"字符集: {list(vocab.keys())}")
    print(f"blank 索引: {Config.blank_index}")

    # 保存词表
    os.makedirs("data/", exist_ok=True)
    with open("data/vocab.json", "w") as f:
        json.dump({
            "char_to_idx": vocab,
            "idx_to_char": {str(k): v for k, v in idx_to_char.items()},
            "vocab_size": len(vocab),   # 28
            "blank_index": Config.blank_index  # 27
        }, f, indent=2, ensure_ascii=False)

    return vocab, idx_to_char


if __name__ == "__main__":
    vocab, idx_to_char = build_vocabulary()
    # 输出:
    # 字符集大小: 28
    # 字符集: ['<space>', 'a', 'b', ..., 'z', '<blank>']
    # blank 索引: 27

这里的字符集设计有几点值得说明:

  1. 为什么用字符级别而非词级别? CTC 要求输入序列长度 >= 输出序列长度。如果使用词级别(词汇量可能上万),输出序列会很短,但每个位置的分类空间极大,模型难以学习。字符级别的输出序列虽然更长,但分类空间只有 28 个类别,学习难度大幅降低。
  2. blank 符号为什么放在最后一个索引?TensorFlow 的 `tf.nn.ctc_loss` 默认将最后一个索引视为 blank。将 blank 放在 index 27(最后一个位置),可以直接使用默认参数,无需额外指定 `blank_index`,兼容性最好。
  3. 没有使用标点符号:LibriSpeech 的转录文本都是大写字母且没有标点,我们统一转为小写后,字符集非常简洁。

3. 音频特征提取(MFCC)

3.1 MFCC 特征简介

MFCC(Mel-Frequency Cepstral Coefficients,梅尔频率倒谱系数)是语音识别中最经典、最广泛使用的声学特征。其计算流程如下:

原始音频波形
    ↓  预加重(Pre-emphasis)
    ↓  分帧(Framing):每帧 25ms,帧移 10ms
    ↓  加窗(Windowing):汉明窗
    ↓  快速傅里叶变换(FFT)
    ↓  Mel 滤波器组(Mel Filter Bank)
    ↓  取对数(Log)
    ↓  离散余弦变换(DCT)
    ↓
MFCC 特征(13维)
    ↓  一阶差分(Delta)→ 13维
    ↓  二阶差分(Delta-Delta)→ 13维
    ↓
最终特征(39维)

MFCC 的设计模拟了人耳的听觉特性——Mel 滤波器组在低频区分辨率高、高频区分辨率低,这与人耳对低频更敏感的生理特性一致。

3.2 特征提取代码

# src/feature_extraction.py

"""
音频特征提取模块

将原始音频波形转换为 MFCC 特征序列,供模型输入使用。
每一帧(10ms)的音频生成一个 39 维的特征向量,
因此一段 T 秒的音频会生成约 T × 100 帧的特征序列。

回顾 Part 1 第3节中的定义:
- 输入序列 x = (x_1, x_2, ..., x_T),其中每个 x_t 是一个 39 维向量
- T = 音频时长(秒)× 100(因为帧移为 10ms)
"""

import numpy as np
import librosa
import soundfile as sf
from src.config import Config


def load_audio(audio_path, target_sr=Config.sample_rate):
    """
    加载音频文件并重采样到目标采样率。

    Args:
        audio_path: 音频文件路径(支持 .wav, .flac 等格式)
        target_sr: 目标采样率,默认 16000 Hz

    Returns:
        audio: 一维 numpy 数组,归一化到 [-1, 1]
        sr: 采样率
    """
    audio, sr = sf.read(audio_path)

    # 如果是多声道,取第一个声道
    if len(audio.shape) > 1:
        audio = audio[:, 0]

    # 重采样
    if sr != target_sr:
        audio = librosa.resample(audio, orig_sr=sr, target_sr=target_sr)
        sr = target_sr

    # 归一化
    max_val = np.max(np.abs(audio))
    if max_val > 0:
        audio = audio / max_val

    return audio, sr


def extract_mfcc(audio, sr=Config.sample_rate):
    """
    从音频波形中提取 MFCC 特征。

    提取过程:
    1. 计算 13 维 MFCC 基础系数
    2. 计算一阶差分(delta),捕捉特征随时间的变化速率
    3. 计算二阶差分(delta-delta),捕捉变化的加速度
    4. 拼接三者得到 39 维特征向量

    Args:
        audio: 一维音频波形数组
        sr: 采样率

    Returns:
        features: numpy 数组,shape = (time_steps, 39)
                  其中 time_steps ≈ 音频时长(秒) × 100
    """
    # Step 1: 计算 13 维 MFCC
    mfcc = librosa.feature.mfcc(
        y=audio,
        sr=sr,
        n_mfcc=13,              # 取前 13 个 MFCC 系数
        n_fft=Config.n_fft,     # FFT 窗口大小
        hop_length=Config.hop_length,  # 帧移:160个采样点 = 10ms
        win_length=Config.win_length,  # 帧长:400个采样点 = 25ms
        n_mels=Config.n_mels    # Mel 滤波器数量
    )
    # mfcc.shape = (13, time_steps)

    # Step 2: 计算一阶差分(Delta)
    # Delta 特征反映了 MFCC 在时间维度上的变化趋势
    delta = librosa.feature.delta(mfcc, order=1)

    # Step 3: 计算二阶差分(Delta-Delta)
    # 反映变化的加速度,对于捕捉语音的动态特性很重要
    delta2 = librosa.feature.delta(mfcc, order=2)

    # Step 4: 拼接三者
    features = np.concatenate([mfcc, delta, delta2], axis=0)
    # features.shape = (39, time_steps)

    # 转置为 (time_steps, 39),符合 RNN 输入格式:(序列长度, 特征维度)
    features = features.T

    # Step 5: 特征标准化(零均值,单位方差)
    # 这一步对模型训练非常重要,可以加速收敛并提升最终效果
    mean = np.mean(features, axis=0)
    std = np.std(features, axis=0)
    features = (features - mean) / (std + 1e-8)  # 加 epsilon 防止除零

    return features.astype(np.float32)


def extract_features_from_file(audio_path):
    """
    从音频文件直接提取特征的便捷函数。

    Args:
        audio_path: 音频文件路径

    Returns:
        features: numpy 数组,shape = (time_steps, 39)
    """
    audio, sr = load_audio(audio_path)

    # 限制最大音频长度,避免内存溢出
    max_samples = int(Config.max_audio_length * sr)
    if len(audio) > max_samples:
        audio = audio[:max_samples]

    features = extract_mfcc(audio, sr)
    return features


# ============ 可视化辅助函数 ============

def plot_features(audio_path, save_path=None):
    """
    可视化音频波形和 MFCC 特征,帮助直观理解特征提取过程。
    """
    import matplotlib.pyplot as plt

    audio, sr = load_audio(audio_path)
    features = extract_mfcc(audio, sr)

    fig, axes = plt.subplots(3, 1, figsize=(14, 10))

    # 1. 原始波形
    time_axis = np.arange(len(audio)) / sr
    axes[0].plot(time_axis, audio, linewidth=0.5)
    axes[0].set_title("Raw Waveform")
    axes[0].set_xlabel("Time (s)")
    axes[0].set_ylabel("Amplitude")

    # 2. MFCC 特征(前13维)
    axes[1].imshow(features[:, :13].T, aspect='auto', origin='lower', cmap='viridis')
    axes[1].set_title("MFCC Features (13-dim)")
    axes[1].set_xlabel("Frame Index")
    axes[1].set_ylabel("MFCC Coefficient")

    # 3. 完整 39 维特征
    axes[2].imshow(features.T, aspect='auto', origin='lower', cmap='viridis')
    axes[2].set_title("Full Features (39-dim: MFCC + Delta + Delta-Delta)")
    axes[2].set_xlabel("Frame Index")
    axes[2].set_ylabel("Feature Dimension")

    plt.tight_layout()
    if save_path:
        plt.savefig(save_path, dpi=150)
    plt.show()

    print(f"音频时长: {len(audio)/sr:.2f}s")
    print(f"特征维度: {features.shape}")
    print(f"帧数 (time_steps): {features.shape[0]}")

3.3 特征提取效果验证

# 快速验证
features = extract_features_from_file("data/raw/LibriSpeech/train-clean-100/19/198/19-198-0000.flac")
print(f"特征形状: {features.shape}")
# 输出示例: 特征形状: (547, 39)
# 说明这段音频生成了 547 帧,每帧 39 维特征

# 可视化
plot_features("data/raw/LibriSpeech/train-clean-100/19/198/19-198-0000.flac")

4. 数据预处理 Pipeline

4.1 数据解析与文本编码

# src/data_pipeline.py

"""
数据加载与预处理 Pipeline

负责:
1. 解析 LibriSpeech 的目录结构和标注文件
2. 将文本转录转为整数索引序列
3. 构建 tf.data.Dataset,实现高效的数据加载与动态 padding
"""

import os
import json
import numpy as np
import tensorflow as tf
from src.config import Config
from src.feature_extraction import extract_features_from_file


def parse_librispeech(data_dir, split="train-clean-100"):
    """
    解析 LibriSpeech 数据集,返回 (音频路径, 转录文本) 列表。

    Args:
        data_dir: 数据根目录
        split: 数据集分割名称

    Returns:
        samples: [(audio_path, transcript), ...] 的列表
    """
    split_dir = os.path.join(data_dir, "raw", "LibriSpeech", split)
    samples = []

    for speaker_id in sorted(os.listdir(split_dir)):
        speaker_dir = os.path.join(split_dir, speaker_id)
        if not os.path.isdir(speaker_dir):
            continue

        for chapter_id in sorted(os.listdir(speaker_dir)):
            chapter_dir = os.path.join(speaker_dir, chapter_id)
            if not os.path.isdir(chapter_dir):
                continue

            # 读取转录文件
            trans_file = os.path.join(
                chapter_dir, f"{speaker_id}-{chapter_id}.trans.txt"
            )
            if not os.path.exists(trans_file):
                continue

            with open(trans_file, "r") as f:
                for line in f:
                    line = line.strip()
                    if not line:
                        continue
                    # 格式: "19-198-0000 NORTHANGER ABBEY"
                    parts = line.split(" ", 1)
                    if len(parts) < 2:
                        continue
                    utt_id = parts[0]
                    transcript = parts[1].lower()  # 统一转为小写

                    audio_path = os.path.join(chapter_dir, f"{utt_id}.flac")
                    if os.path.exists(audio_path):
                        samples.append((audio_path, transcript))

    print(f"[{split}] 共解析到 {len(samples)} 条样本")
    return samples


def text_to_indices(text, char_to_idx):
    """
    将文本转录转为整数索引序列。

    回顾 Part 1 第3节:
    - 输出标签 z = (z_1, z_2, ..., z_U)
    - 每个 z_u 是字符集中的一个字符
    - 这里我们将字符映射为整数索引,方便模型处理

    例如:
        text = "hello world"
        输出 = [8, 5, 12, 12, 15, 0, 23, 15, 18, 12, 4]
        其中 0 代表空格,其他数字代表对应的字母
    """
    indices = []
    for char in text:
        if char == ' ':
            indices.append(char_to_idx['<space>'])
        elif char in char_to_idx:
            indices.append(char_to_idx[char])
        # 忽略不在字符集中的字符(如标点)
    return indices


def indices_to_text(indices, idx_to_char):
    """
    将整数索引序列转回文本(解码时使用)。

    注意:
    - blank 索引(27)会被映射为空字符串(跳过)
    - 空格索引(0)会被映射为空格
    """
    chars = []
    for idx in indices:
        idx_key = int(idx)
        char = idx_to_char.get(idx_key, '')
        if char:  # 跳过 blank(映射为空字符串)
            chars.append(char)
    return ''.join(chars)

4.2 构建 tf.data.Dataset

tf.data.Dataset 是 TensorFlow 推荐的数据加载方式,它支持数据的惰性加载、预取(prefetch)、动态填充(padded_batch)等高效操作。这在处理变长音频数据时尤为重要。

# src/data_pipeline.py(续)

def create_dataset(samples, char_to_idx, is_training=True):
    """
    构建 tf.data.Dataset。

    CTC 训练需要以下四个输入:
    1. features:       音频特征序列,shape = (batch, max_time, 39)
    2. labels:         文本索引序列,shape = (batch, max_label_len)
    3. feature_lengths: 每条样本的实际特征帧数,shape = (batch,)
    4. label_lengths:   每条样本的实际标签长度,shape = (batch,)

    其中 feature_lengths 和 label_lengths 对于 CTC Loss 的计算至关重要——
    它们告诉 CTC 算法每条样本的有效序列长度是多少,
    padding 部分不应参与 Loss 计算。
    """

    def _process_sample(audio_path, transcript):
        """处理单条样本:提取特征 + 文本编码"""
        # 提取 MFCC 特征
        features = extract_features_from_file(audio_path.numpy().decode('utf-8'))

        # 文本转索引
        text = transcript.numpy().decode('utf-8')
        label = text_to_indices(text, char_to_idx)
        label = np.array(label, dtype=np.int32)

        feature_length = np.int32(features.shape[0])
        label_length = np.int32(len(label))

        return features, label, feature_length, label_length

    def tf_process_sample(audio_path, transcript):
        """tf.py_function 封装,使 numpy 操作能在 tf.data 管线中运行"""
        features, label, feat_len, label_len = tf.py_function(
            func=_process_sample,
            inp=[audio_path, transcript],
            Tout=[tf.float32, tf.int32, tf.int32, tf.int32]
        )
        # 设置形状信息(tf.py_function 会丢失形状信息)
        features.set_shape([None, Config.n_mfcc])
        label.set_shape([None])
        feat_len.set_shape([])
        label_len.set_shape([])
        return features, label, feat_len, label_len

    # 构建数据集
    audio_paths = [s[0] for s in samples]
    transcripts = [s[1] for s in samples]

    dataset = tf.data.Dataset.from_tensor_slices((audio_paths, transcripts))

    if is_training:
        dataset = dataset.shuffle(buffer_size=min(len(samples), 10000))

    dataset = dataset.map(
        tf_process_sample,
        num_parallel_calls=tf.data.AUTOTUNE  # 并行处理加速
    )

    # 过滤掉异常样本(特征帧数必须 > 标签长度,这是 CTC 的基本约束)
    dataset = dataset.filter(
        lambda feat, lab, fl, ll: tf.greater(fl, ll)
    )

    # 动态 padding + 分批
    # padding_values 中标签用 blank_index 填充(不会影响 CTC Loss 计算,
    # 因为 label_lengths 已经告诉了 CTC 有效标签的长度)
    dataset = dataset.padded_batch(
        batch_size=Config.batch_size,
        padded_shapes=(
            [None, Config.n_mfcc],  # features: 填充时间维度
            [None],                  # labels: 填充到最长标签
            [],                      # feature_length: 标量,不填充
            []                       # label_length: 标量,不填充
        ),
        padding_values=(
            0.0,                       # 特征用 0.0 填充
            np.int32(Config.blank_index),  # 标签用 blank 索引填充
            np.int32(0),               # feature_length
            np.int32(0)                # label_length
        ),
        drop_remainder=is_training  # 训练时丢弃不完整的最后一个 batch
    )

    dataset = dataset.prefetch(tf.data.AUTOTUNE)  # 预取加速
    return dataset

这段代码中有几个关键的设计决策:

  1. filter 操作:`fl > ll` 确保输入序列长度大于输出序列长度。回顾 Part 1 第 3 节,这是 CTC 的基本约束条件——如果输入帧数不够标签字符数多,CTC 无法为每个字符分配至少一个时间步。
  2. padded_batch:这是处理变长序列的标准做法。不同音频的特征帧数不同,需要 padding 到同一长度才能组成 batch。但 padding 部分不参与 CTC Loss 的计算,这就是为什么我们需要传入 `feature_lengths` 和 `label_lengths`。
  3. prefetch:在 GPU 训练当前 batch 的同时,CPU 预取下一个 batch 的数据,实现流水线并行,大幅提升训练吞吐量。

5. 模型架构设计(BiLSTM + CTC)

5.1 架构概览

我们采用经典的 BiLSTM + FC + CTC 架构。这也是 Alex Graves 在 CTC 原始论文及后续工作中使用的架构。

输入特征: (batch, T, 39)
        ↓
  [BiLSTM Layer 1]  →  (batch, T, 512)    # 256 forward + 256 backward
        ↓  Dropout
  [BiLSTM Layer 2]  →  (batch, T, 512)
        ↓  Dropout
  [BiLSTM Layer 3]  →  (batch, T, 512)
        ↓  Dropout
  [Dense Layer]     →  (batch, T, 28)     # 投影到字符集大小
        ↓
  [CTC Loss / CTC Decode]                 # 注意:这里没有 Softmax 层!

回顾 Part 1 第 4 节和第 5 节:

  1. RNN(这里是 BiLSTM)在每个时间步 t 输出一个概率向量 y_t,维度等于扩展字符集大小(26 字母 + 空格 + blank = 28)
  2. y_t^k 表示时间步 t 输出字符 k 的概率
  3. CTC Loss 通过累加所有合法路径的概率来计算

5.2 为什么选择双向 LSTM?

双向 LSTM(BiLSTM)包含前向和后向两个方向的 LSTM,能够同时利用过去和未来的上下文信息。在语音识别中,一个音素的发音往往受到前后音素的影响(协同发音现象),因此双向结构能显著提升识别效果。

在非流式(offline)语音识别场景中,整条音频是可以提前获取的,因此使用双向结构没有问题。但在流式(streaming / online)场景中,只能使用单向 LSTM 或带有限前看(limited lookahead)的结构。

5.3 模型代码

# src/model.py

"""
CTC 语音识别模型定义

架构:多层 BiLSTM + 全连接层

回顾 Part 1 第 5 节中的模型结构图:
- RNN 部分:本实现使用 3 层 BiLSTM(比原论文中的单层更深)
- 输出层:Dense(vocab_size),输出 logits(不加 Softmax!)
- 训练时:CTC Loss 内部自行做 Softmax
- 解码时:CTC Decode 内部自行做 Softmax
"""

import tensorflow as tf
from src.config import Config


def build_ctc_model(vocab_size=Config.vocab_size, input_dim=Config.n_mfcc):
    """
    构建 CTC 语音识别模型。

    Args:
        vocab_size: 字符集大小(包括 blank),本实验为 28
        input_dim: 输入特征维度,默认 39

    Returns:
        model: tf.keras.Model 实例
    """

    # ============ 输入层 ============
    input_features = tf.keras.Input(
        shape=(None, input_dim),
        dtype=tf.float32,
        name="input_features"
    )

    x = input_features

    # ============ BiLSTM 层 ============
    for i in range(Config.num_rnn_layers):
        # return_sequences=True: 返回每个时间步的输出,而非仅最后一步
        # 这对 CTC 至关重要——CTC 需要每个时间步都有输出!
        #
        # 注意:LSTM 的 dropout 参数只在 training=True 时生效,
        # 这里不额外加 Dropout 层,避免双重 dropout。
        lstm = tf.keras.layers.LSTM(
            units=Config.rnn_units,
            return_sequences=True,
            dropout=Config.dropout_rate,    # 输入到隐藏层的 dropout
            recurrent_dropout=0,            # 设为 0 以兼容 cuDNN 加速
            name=f"lstm_{i}"
        )

        if Config.use_bidirectional:
            x = tf.keras.layers.Bidirectional(
                lstm,
                merge_mode='concat',   # 前向和后向的输出拼接
                name=f"bilstm_{i}"
            )(x)
            # 拼接后维度: (batch, time_steps, rnn_units * 2) = (batch, T, 512)
        else:
            x = lstm(x)
            # 维度: (batch, time_steps, rnn_units) = (batch, T, 256)

    # ============ 输出层 ============
    # 全连接层:将 LSTM 输出投影到字符集大小
    # 注意:输出 logits(无激活函数),不加 Softmax!
    # tf.nn.ctc_loss 和 tf.nn.ctc_beam_search_decoder 都接受 logits 输入,
    # 内部自行做 log_softmax / softmax。
    # 如果这里多加一层 Softmax,等于做了两次 Softmax,训练将完全不收敛!
    logits = tf.keras.layers.Dense(
        vocab_size,
        activation=None,        # 输出 logits
        name="output_dense"
    )(x)
    # logits.shape = (batch, time_steps, vocab_size)

    model = tf.keras.Model(
        inputs=input_features,
        outputs=logits,
        name="ctc_speech_recognition"
    )

    return model


# ============ 快速测试 ============
if __name__ == "__main__":
    model = build_ctc_model()
    model.summary()

    # 模拟输入测试
    dummy_input = tf.random.normal((2, 300, 39))  # batch=2, 300帧, 39维
    output = model(dummy_input, training=False)
    print(f"\n输入形状: {dummy_input.shape}")
    print(f"输出形状: {output.shape}")
    # 预期输出: (2, 300, 28)
    # 解释: 每个时间步输出一个 28 维的 logits 向量,
    #        对应 28 个字符(含 blank)的未归一化分数

5.4 关于模型输出 logits 的重要说明

模型输出的是 **logits**(未经 Softmax 的原始分数),而不是概率。这一点极其重要:

  • 训练时:`tf.nn.ctc_loss` 内部会自动对 logits 做 log_softmax,然后计算 CTC Loss。
  • 解码时:`tf.nn.ctc_beam_search_decoder` 同样接受 logits 输入,内部处理 softmax。
  • 常见 bug:如果在模型中额外加了 Softmax 层,等于对概率做了两次 Softmax,训练将完全不收敛,但不会报任何错误。这是 CTC 实现中最隐蔽的陷阱之一。

6. CTC Loss 函数与训练过程

6.1 CTC Loss 的调用

TensorFlow 已经内置了 CTC Loss 的高效实现(基于 Part 1 第 6 节中介绍的 Forward-Backward 动态规划算法),我们可以直接调用:

# src/train.py

"""
CTC 模型训练模块

核心流程:
1. 模型前向传播 → 得到 logits
2. 计算 CTC Loss(TF 内部使用 Forward-Backward 算法)
3. 反向传播 → 计算梯度(对应 Part 1 第 7 节的 CTC Loss 求导)
4. 梯度裁剪 → 更新参数
"""

import os
import time
import numpy as np
import tensorflow as tf
from tqdm import tqdm
from src.config import Config
from src.model import build_ctc_model
from src.data_pipeline import parse_librispeech, create_dataset, indices_to_text
from src.decode import ctc_greedy_decode
from src.evaluate import compute_cer, compute_wer


def compute_ctc_loss(logits, labels, logit_lengths, label_lengths):
    """
    计算 CTC Loss。

    这里直接调用 TensorFlow 的 tf.nn.ctc_loss,其内部实现了
    Part 1 第 6 节中的 Forward-Backward 动态规划算法:

    1. 对 logits 做 log_softmax 得到每个时间步的对数概率分布
    2. 构建扩展标签序列 l'(插入 blank)
    3. 前向传播:计算前向概率 alpha(s, t)
    4. 后向传播:计算后向概率 beta(s, t)
    5. 综合前向和后向概率计算 p(z|x)
    6. 取负对数得到 CTC Loss

    Args:
        logits:         模型输出,shape = (batch, max_time, vocab_size)
        labels:         标签索引,shape = (batch, max_label_len)
        logit_lengths:  每条样本的有效帧数,shape = (batch,)
        label_lengths:  每条样本的有效标签长度,shape = (batch,)

    Returns:
        loss: 标量,batch 内所有样本的平均 CTC Loss
    """
    # tf.nn.ctc_loss 要求 logits 的时间维度在第一维(当 logits_time_major=True 时)
    # 转置: (batch, time, vocab) -> (time, batch, vocab)
    logits_transposed = tf.transpose(logits, perm=[1, 0, 2])

    # 调用 CTC Loss
    # blank_index 默认为 vocab_size - 1(即最后一个索引 = 27),
    # 与我们的词表设计一致,因此无需额外指定。
    loss = tf.nn.ctc_loss(
        labels=labels,
        logits=logits_transposed,
        label_length=label_lengths,
        logit_length=logit_lengths,
        logits_time_major=True,       # logits 的第一维是时间
    )
    # loss.shape = (batch,),每条样本一个 loss 值

    # 返回 batch 平均 loss
    return tf.reduce_mean(loss)

6.2 训练循环

# src/train.py(续)

class CTCTrainer:
    """CTC 模型训练器"""

    def __init__(self, vocab_size=Config.vocab_size):
        self.model = build_ctc_model(vocab_size)
        self.optimizer = tf.keras.optimizers.Adam(
            learning_rate=Config.learning_rate
        )
        self.vocab_size = vocab_size

        # TensorBoard 日志
        os.makedirs(Config.log_dir, exist_ok=True)
        self.train_summary_writer = tf.summary.create_file_writer(
            os.path.join(Config.log_dir, "train")
        )
        self.val_summary_writer = tf.summary.create_file_writer(
            os.path.join(Config.log_dir, "val")
        )

    @tf.function(
        input_signature=[
            tf.TensorSpec(shape=[None, None, Config.n_mfcc], dtype=tf.float32),
            tf.TensorSpec(shape=[None, None], dtype=tf.int32),
            tf.TensorSpec(shape=[None], dtype=tf.int32),
            tf.TensorSpec(shape=[None], dtype=tf.int32),
        ]
    )
    def train_step(self, features, labels, feature_lengths, label_lengths):
        """
        单步训练(使用 @tf.function 编译为计算图加速)。

        对应 Part 1 第 7 节的完整流程:
        1. 前向传播 → logits
        2. CTC Loss 计算(Forward-Backward 算法)
        3. 反向传播 → 梯度计算
        4. 梯度裁剪 → 防止梯度爆炸
        5. 参数更新
        """
        with tf.GradientTape() as tape:
            # Step 1: 前向传播
            logits = self.model(features, training=True)

            # Step 2: 计算 CTC Loss
            loss = compute_ctc_loss(
                logits, labels, feature_lengths, label_lengths
            )

        # Step 3: 计算梯度
        gradients = tape.gradient(loss, self.model.trainable_variables)

        # Step 4: 梯度裁剪
        # CTC 训练初期梯度非常不稳定,裁剪是必需的
        gradients, grad_norm = tf.clip_by_global_norm(
            gradients, Config.clip_grad_norm
        )

        # Step 5: 更新参数
        self.optimizer.apply_gradients(
            zip(gradients, self.model.trainable_variables)
        )

        return loss, grad_norm

    @tf.function(
        input_signature=[
            tf.TensorSpec(shape=[None, None, Config.n_mfcc], dtype=tf.float32),
            tf.TensorSpec(shape=[None, None], dtype=tf.int32),
            tf.TensorSpec(shape=[None], dtype=tf.int32),
            tf.TensorSpec(shape=[None], dtype=tf.int32),
        ]
    )
    def val_step(self, features, labels, feature_lengths, label_lengths):
        """验证步骤(不计算梯度,不更新参数)"""
        logits = self.model(features, training=False)
        loss = compute_ctc_loss(
            logits, labels, feature_lengths, label_lengths
        )
        return loss

    def train(self, train_dataset, val_dataset, idx_to_char):
        """
        完整训练流程。

        Args:
            train_dataset: 训练集 tf.data.Dataset
            val_dataset: 验证集 tf.data.Dataset
            idx_to_char: 索引到字符的映射表
        """
        best_val_loss = float('inf')
        no_improve_count = 0

        for epoch in range(Config.epochs):
            start_time = time.time()

            # ======== 训练阶段 ========
            train_losses = []
            for batch in tqdm(train_dataset, desc=f"Epoch {epoch+1}/{Config.epochs} [Train]"):
                features, labels, feat_lens, label_lens = batch
                loss, grad_norm = self.train_step(
                    features, labels, feat_lens, label_lens
                )
                train_losses.append(loss.numpy())

            avg_train_loss = np.mean(train_losses)

            # ======== 验证阶段 ========
            val_losses = []
            for batch in tqdm(val_dataset, desc=f"Epoch {epoch+1}/{Config.epochs} [Val]"):
                features, labels, feat_lens, label_lens = batch
                loss = self.val_step(
                    features, labels, feat_lens, label_lens
                )
                val_losses.append(loss.numpy())

            avg_val_loss = np.mean(val_losses)

            # ======== 日志记录 ========
            elapsed = time.time() - start_time
            current_lr = self.optimizer.learning_rate.numpy()
            print(f"\nEpoch {epoch+1}/{Config.epochs} - "
                  f"Train Loss: {avg_train_loss:.4f} - "
                  f"Val Loss: {avg_val_loss:.4f} - "
                  f"LR: {current_lr:.6f} - "
                  f"Time: {elapsed:.1f}s")

            # TensorBoard 日志
            with self.train_summary_writer.as_default():
                tf.summary.scalar('loss', avg_train_loss, step=epoch)
            with self.val_summary_writer.as_default():
                tf.summary.scalar('loss', avg_val_loss, step=epoch)

            # ======== 模型保存与学习率衰减 ========
            if avg_val_loss < best_val_loss:
                best_val_loss = avg_val_loss
                no_improve_count = 0
                self.save_checkpoint(epoch, avg_val_loss)
                print(f"  -> 保存最优模型 (val_loss: {avg_val_loss:.4f})")
            else:
                no_improve_count += 1
                print(f"  -> 验证集 loss 未改善 ({no_improve_count}/{Config.lr_decay_patience})")

                # 学习率衰减
                if no_improve_count >= Config.lr_decay_patience:
                    old_lr = current_lr
                    new_lr = max(old_lr * Config.lr_decay_factor, 1e-6)
                    self.optimizer.learning_rate.assign(new_lr)
                    no_improve_count = 0
                    print(f"  -> 学习率衰减: {old_lr:.6f} -> {new_lr:.6f}")

            # ======== 每 5 个 epoch 做一次解码示例 ========
            if (epoch + 1) % 5 == 0:
                self._decode_samples(val_dataset, idx_to_char, num_samples=3)

            # ======== Early Stopping ========
            if current_lr <= 1e-6 and no_improve_count >= Config.lr_decay_patience:
                print(f"\nEarly stopping at epoch {epoch+1}")
                break

        print(f"\n训练完成!最优验证集 Loss: {best_val_loss:.4f}")

    def _decode_samples(self, dataset, idx_to_char, num_samples=3):
        """解码几条验证样本,直观观察模型效果"""
        print("\n  ---- 解码示例 ----")
        for batch in dataset.take(1):
            features, labels, feat_lens, label_lens = batch
            logits = self.model(features, training=False)

            for j in range(min(num_samples, features.shape[0])):
                # Greedy Decode
                pred_list = ctc_greedy_decode(
                    logits[j:j+1], feat_lens[j:j+1], idx_to_char
                )
                # 真实标签
                true_label = indices_to_text(
                    labels[j][:label_lens[j]].numpy().tolist(), idx_to_char
                )
                print(f"  [样本 {j+1}]")
                print(f"    真实: {true_label}")
                print(f"    预测: {pred_list[0]}")
        print("  ------------------\n")

    def save_checkpoint(self, epoch, val_loss):
        """保存模型检查点"""
        os.makedirs(Config.checkpoint_dir, exist_ok=True)
        path = os.path.join(
            Config.checkpoint_dir,
            f"model_epoch{epoch+1}_loss{val_loss:.4f}.h5"
        )
        self.model.save_weights(path)

    def load_checkpoint(self, path):
        """加载模型检查点"""
        self.model.load_weights(path)
        print(f"已加载模型: {path}")

6.3 关于 CTC 训练的几个重要注意事项

  1. 梯度裁剪(Gradient Clipping)是必需的:CTC 训练初期,由于前向概率和后向概率涉及大量概率值的乘积,数值容易变得极端(非常大或非常小),导致梯度爆炸。`clip_by_global_norm` 将梯度向量的全局范数限制在指定阈值内,是确保训练稳定的关键手段。
  2. 学习率不宜过大:与常规分类任务不同,CTC 的 loss landscape 更加复杂。建议从 1e-3 或 1e-4 开始,配合学习率衰减策略。如果训练初期 loss 为 NaN 或 Inf,通常是学习率过大导致的。
  3. blank_index 必须与词表一致:本实现中 blank 位于 index 27(字符集的最后一个位置),这与 TensorFlow `tf.nn.ctc_loss` 的默认行为一致(默认将 `num_classes – 1` 视为 blank)。如果你的词表将 blank 放在 index 0,则必须在 `tf.nn.ctc_loss` 中显式传入 `blank_index=0`,否则训练会完全不收敛但不报任何错误——这是一个极其隐蔽的 bug。
  4. 输入长度必须大于输出长度:这是 CTC 的硬约束。在 data_pipeline 中我们已经用 `filter` 过滤掉了不满足条件的样本,但在训练中仍需注意:如果某条音频特别短而标签特别长,`tf.nn.ctc_loss` 会返回 Inf 或 NaN。

7. CTC 解码实现

7.1 Greedy Decoding(贪心解码)

回顾 Part 2 中的分析,贪心解码在每个时间步选择概率最大的字符,然后通过映射函数 B(去重 + 去 blank)得到最终标签。虽然不是最优解,但实现简单,常用于训练过程中的快速效果检验。

# src/decode.py

"""
CTC 解码模块

实现了三种解码算法:
1. Greedy Decoding(贪心解码)- 对应 Part 2 中的 Best Path Decoding
2. TensorFlow 内置 Beam Search
3. 自定义 Beam Search(带更详细的过程输出)
"""

import numpy as np
import tensorflow as tf
from src.config import Config


def ctc_greedy_decode(logits, logit_lengths, idx_to_char):
    """
    CTC 贪心解码。

    对应 Part 2 中介绍的 Best Path Decoding:
    1. 在每个时间步选择概率最大的字符 → 得到一条路径 pi
    2. 通过映射函数 B 对路径做去重和去 blank → 得到最终标签 z

    映射函数 B 的具体操作(回顾 Part 1 第 4 节):
    - 先合并相邻的重复字符(如 "aabcc" → "abc")
    - 再去除路径中所有的 blank 符号

    Args:
        logits:         模型输出,shape = (batch, time, vocab_size)
        logit_lengths:  有效帧数,shape = (batch,)
        idx_to_char:    索引到字符的映射(int -> str)

    Returns:
        results: 解码结果字符串列表
    """
    blank_index = Config.blank_index

    # Step 1: 在每个时间步取 argmax → 得到路径
    predictions = tf.argmax(logits, axis=-1).numpy()

    results = []
    for b in range(predictions.shape[0]):
        length = logit_lengths[b] if hasattr(logit_lengths, '__getitem__') else logit_lengths.numpy()
        if isinstance(length, np.ndarray):
            length = int(length[b]) if length.ndim > 0 else int(length)
        else:
            length = int(length)

        seq = predictions[b][:length]  # 只取有效部分

        # Step 2: 映射函数 B — 去重 + 去 blank
        decoded = []
        prev_idx = -1
        for idx in seq:
            idx = int(idx)
            if idx != blank_index and idx != prev_idx:
                decoded.append(idx)
            prev_idx = idx

        # 转为文本
        text = ''
        for idx in decoded:
            char = idx_to_char.get(idx, '')
            text += char

        results.append(text)

    return results

7.2 TensorFlow 内置 Beam Search

# src/decode.py(续)

def ctc_beam_search_decode_tf(logits, logit_lengths, idx_to_char,
                               beam_width=Config.beam_width):
    """
    使用 TensorFlow 内置的 CTC Beam Search 解码。

    对应 Part 2 第 2 节中介绍的 CTC Beam Search Decoding 算法。
    TF 内部实现了完整的 Beam Search 流程,包括 gamma_b / gamma_nb 的递推。

    Args:
        logits:         模型输出,shape = (batch, time, vocab_size)
        logit_lengths:  有效帧数,shape = (batch,)
        idx_to_char:    索引到字符的映射
        beam_width:     束宽 W

    Returns:
        results: 解码结果字符串列表
    """
    # 转置为 time-major: (time, batch, vocab)
    logits_t = tf.transpose(logits, perm=[1, 0, 2])

    # 调用 TF 内置的 Beam Search 解码
    decoded_sparse_list, log_probs = tf.nn.ctc_beam_search_decoder(
        inputs=logits_t,
        sequence_length=logit_lengths,
        beam_width=beam_width,
        top_paths=1           # 只返回概率最高的 1 条路径
    )

    # decoded_sparse_list[0] 是 SparseTensor,转为 dense
    decoded_dense = tf.sparse.to_dense(
        decoded_sparse_list[0], default_value=-1
    ).numpy()

    results = []
    for b in range(decoded_dense.shape[0]):
        text = ''
        for idx in decoded_dense[b]:
            idx = int(idx)
            if idx < 0:  # padding(-1)
                break
            # 注意:ctc_beam_search_decoder 返回的索引已经去除了 blank,
            # 并且索引范围是 [0, num_classes-2](不包含 blank)。
            # 由于我们的 blank 在最后一个位置(index 27),
            # 返回的索引 0-26 与我们的 char 索引一一对应。
            char = idx_to_char.get(idx, '')
            text += char
        results.append(text)

    return results

7.3 自定义 Beam Search(完整实现)

为了更好地理解 Part 2 中的算法原理,我们手动实现一个 CTC Beam Search 解码器:

# src/decode.py(续)

def ctc_beam_search_decode_custom(logits_single, logit_length, idx_to_char,
                                   beam_width=Config.beam_width):
    """
    自定义 CTC Beam Search 解码(单条样本版本)。

    完整实现了 Part 2 第 2 节中的算法流程:

    1. 初始化候选集 Omega = {epsilon}(空前缀)
    2. 对每个时间步 t:
       a. 对 Omega 中每个前缀 l,用 blank 和每个字符 c 分别扩展
       b. 正确处理 "扩展字符与末尾字符相同" 的特殊情况
       c. 从扩展结果中保留 top-W 个前缀
    3. 返回最终概率最高的前缀

    关键细节——为什么需要分 gamma_b 和 gamma_nb:
    考虑前缀 l = "a",如果当前时间步输出字符也是 "a":
    - 如果上一步输出的是 blank,则这个 "a" 是一个新字符 → 前缀变为 "aa"
    - 如果上一步输出的不是 blank,则这个 "a" 是重复 → 前缀仍为 "a"
    因此必须区分"上一步是否输出了 blank",这就是 gamma_b 和 gamma_nb 存在的原因。

    Args:
        logits_single: 单条样本的模型输出,shape = (time, vocab_size)
        logit_length:  有效帧数(int)
        idx_to_char:   索引到字符的映射
        beam_width:    束宽 W

    Returns:
        best_text: 最优解码结果字符串
        beam: 完整的 beam 信息(dict,用于调试分析)
    """
    blank_index = Config.blank_index
    T = int(logit_length)
    vocab_size = logits_single.shape[-1]

    # Softmax: logits → 概率
    if isinstance(logits_single, tf.Tensor):
        logits_np = logits_single.numpy()
    else:
        logits_np = logits_single

    probs = np.exp(logits_np[:T] - np.max(logits_np[:T], axis=-1, keepdims=True))
    probs = probs / np.sum(probs, axis=-1, keepdims=True)

    # ============ 初始化 ============
    # 每个候选用 (前缀文本, {p_b, p_nb}) 表示
    # p_b:  当前时刻末尾为 blank 的概率
    # p_nb: 当前时刻末尾为非 blank 字符的概率
    beam = {
        '': {'p_b': 1.0, 'p_nb': 0.0}
    }

    # ============ 逐时间步搜索 ============
    for t in range(T):
        new_beam = {}

        for prefix, scores in beam.items():
            p_b = scores['p_b']
            p_nb = scores['p_nb']
            p_total = p_b + p_nb

            # ---- 情况 1: 用 blank 扩展 ----
            # 输出 blank 不改变前缀,但更新 p_b
            new_p_b = p_total * probs[t, blank_index]
            if prefix not in new_beam:
                new_beam[prefix] = {'p_b': 0.0, 'p_nb': 0.0}
            new_beam[prefix]['p_b'] += new_p_b

            # ---- 情况 2: 用非 blank 字符扩展 ----
            for c_idx in range(vocab_size):
                if c_idx == blank_index:
                    continue

                c_char = idx_to_char.get(c_idx, '')
                if c_char == '':
                    continue

                prob_c = probs[t, c_idx]
                new_prefix = prefix + c_char

                if prefix and c_char == prefix[-1]:
                    # 特殊情况:扩展字符与末尾字符相同
                    # (a) 上一步是 blank → 新字符 → 前缀变为 l + c
                    if new_prefix not in new_beam:
                        new_beam[new_prefix] = {'p_b': 0.0, 'p_nb': 0.0}
                    new_beam[new_prefix]['p_nb'] += p_b * prob_c

                    # (b) 上一步不是 blank → 重复 → 前缀不变(仍为 l)
                    if prefix not in new_beam:
                        new_beam[prefix] = {'p_b': 0.0, 'p_nb': 0.0}
                    new_beam[prefix]['p_nb'] += p_nb * prob_c
                else:
                    # 普通情况:扩展字符与末尾字符不同
                    if new_prefix not in new_beam:
                        new_beam[new_prefix] = {'p_b': 0.0, 'p_nb': 0.0}
                    new_beam[new_prefix]['p_nb'] += p_total * prob_c

        # ---- 剪枝:保留 top-W ----
        sorted_beam = sorted(
            new_beam.items(),
            key=lambda x: x[1]['p_b'] + x[1]['p_nb'],
            reverse=True
        )[:beam_width]

        beam = dict(sorted_beam)

    # ============ 返回最优结果 ============
    best_prefix = max(
        beam.items(),
        key=lambda x: x[1]['p_b'] + x[1]['p_nb']
    )

    return best_prefix[0], beam

8. 模型评估与指标计算

8.1 评估指标

语音识别系统最常用的评估指标有两个:

CER(Character Error Rate,字符错误率):在字符级别计算编辑距离

CER = (S + D + I) / N,其中 S = 替换数,D = 删除数,I = 插入数,N = 参考文本的字符数。

WER(Word Error Rate,词错误率):在词级别计算编辑距离,是语音识别领域最主流的评估指标。

WER = (S + D + I) / N,公式相同,但 S/D/I/N 的单位是”词”而非”字符”。

8.2 评估代码

# src/evaluate.py

"""
模型评估模块

使用编辑距离(Levenshtein Distance)计算 CER 和 WER。
"""

import numpy as np
from src.config import Config
from src.data_pipeline import indices_to_text


def levenshtein_distance(ref, hyp):
    """
    计算两个序列之间的编辑距离(动态规划实现)。

    Args:
        ref: 参考序列(list)
        hyp: 假设序列(list)

    Returns:
        distance: 编辑距离
        n_ref: 参考序列长度
    """
    m, n = len(ref), len(hyp)
    dp = np.zeros((m + 1, n + 1), dtype=np.int32)

    for i in range(m + 1):
        dp[i][0] = i
    for j in range(n + 1):
        dp[0][j] = j

    for i in range(1, m + 1):
        for j in range(1, n + 1):
            if ref[i - 1] == hyp[j - 1]:
                dp[i][j] = dp[i - 1][j - 1]
            else:
                dp[i][j] = min(
                    dp[i - 1][j] + 1,      # 删除
                    dp[i][j - 1] + 1,      # 插入
                    dp[i - 1][j - 1] + 1   # 替换
                )

    return dp[m][n], m


def compute_cer(references, hypotheses):
    """计算字符错误率"""
    total_dist, total_len = 0, 0
    for ref, hyp in zip(references, hypotheses):
        dist, ref_len = levenshtein_distance(list(ref), list(hyp))
        total_dist += dist
        total_len += ref_len
    return total_dist / max(total_len, 1)


def compute_wer(references, hypotheses):
    """计算词错误率"""
    total_dist, total_len = 0, 0
    for ref, hyp in zip(references, hypotheses):
        dist, ref_len = levenshtein_distance(ref.split(), hyp.split())
        total_dist += dist
        total_len += ref_len
    return total_dist / max(total_len, 1)


def evaluate_model(model, dataset, idx_to_char, decode_fn, num_batches=None):
    """
    在数据集上评估模型。

    Args:
        model: CTC 模型
        dataset: tf.data.Dataset
        idx_to_char: 索引到字符的映射
        decode_fn: 解码函数,签名为 fn(logits, lengths, idx_to_char) -> list[str]
        num_batches: 评估的 batch 数(None 表示全部)

    Returns:
        avg_cer, avg_wer
    """
    all_refs = []
    all_hyps = []

    for i, batch in enumerate(dataset):
        if num_batches and i >= num_batches:
            break

        features, labels, feat_lens, label_lens = batch
        logits = model(features, training=False)

        # 解码
        predictions = decode_fn(logits, feat_lens, idx_to_char)

        # 还原真实标签
        for j in range(labels.shape[0]):
            ref_indices = labels[j][:label_lens[j]].numpy().tolist()
            ref_text = indices_to_text(ref_indices, idx_to_char)
            hyp_text = predictions[j] if j < len(predictions) else ''
            all_refs.append(ref_text)
            all_hyps.append(hyp_text)

    cer = compute_cer(all_refs, all_hyps)
    wer = compute_wer(all_refs, all_hyps)

    print(f"\n评估结果:")
    print(f"  CER: {cer*100:.2f}%")
    print(f"  WER: {wer*100:.2f}%")
    print(f"  评估样本数: {len(all_refs)}")

    # 打印几个解码样本
    print(f"\n  解码示例:")
    for i in range(min(5, len(all_refs))):
        print(f"    REF: {all_refs[i]}")
        print(f"    HYP: {all_hyps[i]}")
        print()

    return cer, wer

9. 完整训练流程与实验结果

9.1 主训练脚本

# main.py

"""
CTC 语音识别系统 - 主训练脚本

运行方式: python main.py
"""

import glob
from src.config import Config
from src.build_vocab import build_vocabulary
from src.data_pipeline import parse_librispeech, create_dataset
from src.train import CTCTrainer
from src.decode import ctc_greedy_decode, ctc_beam_search_decode_tf
from src.evaluate import evaluate_model


def main():
    # ============ Step 1: 构建词表 ============
    print("=" * 60)
    print("Step 1: 构建字符集词表")
    print("=" * 60)
    char_to_idx, idx_to_char = build_vocabulary()
    print(f"字符集大小: {Config.vocab_size}")

    # ============ Step 2: 解析数据集 ============
    print("\n" + "=" * 60)
    print("Step 2: 解析数据集")
    print("=" * 60)
    train_samples = parse_librispeech(Config.data_dir, "train-clean-100")
    val_samples = parse_librispeech(Config.data_dir, "test-clean")

    # ============ Step 3: 构建数据 Pipeline ============
    print("\n" + "=" * 60)
    print("Step 3: 构建数据 Pipeline")
    print("=" * 60)
    train_dataset = create_dataset(train_samples, char_to_idx, is_training=True)
    val_dataset = create_dataset(val_samples, char_to_idx, is_training=False)
    print("数据 Pipeline 构建完成")

    # ============ Step 4: 训练模型 ============
    print("\n" + "=" * 60)
    print("Step 4: 开始训练")
    print("=" * 60)
    trainer = CTCTrainer()
    trainer.model.summary()
    trainer.train(train_dataset, val_dataset, idx_to_char)

    # ============ Step 5: 最终评估 ============
    print("\n" + "=" * 60)
    print("Step 5: 最终评估")
    print("=" * 60)

    # 加载最优模型
    ckpts = sorted(glob.glob(f"{Config.checkpoint_dir}/model_*.h5"))
    if ckpts:
        trainer.load_checkpoint(ckpts[-1])

    # Greedy Decode 评估
    print("\n--- Greedy Decoding ---")
    cer_greedy, wer_greedy = evaluate_model(
        trainer.model, val_dataset, idx_to_char, ctc_greedy_decode
    )

    # Beam Search 评估
    print("\n--- Beam Search Decoding (W=25) ---")
    cer_beam, wer_beam = evaluate_model(
        trainer.model, val_dataset, idx_to_char, ctc_beam_search_decode_tf
    )

    # ============ 结果汇总 ============
    print("\n" + "=" * 60)
    print("实验结果汇总")
    print("=" * 60)
    print(f"  Greedy Decoding    - CER: {cer_greedy*100:.2f}%  WER: {wer_greedy*100:.2f}%")
    print(f"  Beam Search (W=25) - CER: {cer_beam*100:.2f}%  WER: {wer_beam*100:.2f}%")


if __name__ == "__main__":
    main()

9.2 预期实验结果

在 LibriSpeech train-clean-100 上训练约 50-80 个 epoch 后,使用 3 层 BiLSTM(256 units)的模型,预期可以达到以下效果:

解码方法CER (test-clean)WER (test-clean)
Greedy Decoding~15-20%~45-55%
Beam Search (W=25) ~13-18%~40-50%
Beam Search + 外部 LM~8-12%~25-35%

几点说明:

  1. CER 远低于 WER 是正常的——因为即使只错了一个字符,在词级别也算错一整个词。
  2. Beam Search 比 Greedy 好 验证了 Part 2 中的分析:Beam Search 能搜索到更优的标签序列。
  3. 加上外部语言模型后提升显著,说明 CTC 的条件独立假设确实限制了模型对输出序列内部依赖的建模能力,语言模型能有效弥补这一不足。
  4. 以上结果是纯 CTC 模型(无预训练、无数据增强、无外部 LM)的基线水平。当前 SOTA 系统通常使用更大的模型(如 Conformer)、更多的训练数据和更复杂的训练策略。

9.3 训练过程可视化

可以使用 TensorBoard 监控训练过程:

tensorboard --logdir logs/

典型的 CTC 训练 loss 曲线特征:

  • 前 5 个 epoch:loss 下降非常快(模型学会了输出 blank 为主的模式)
  • 5-30 个 epoch:loss 缓慢但稳定下降(模型开始学习真正的字符输出)
  • 30+ 个 epoch:loss 趋于平缓,开始需要学习率衰减来继续优化

10. 常见问题与调优建议

10.1 训练不收敛 / Loss 为 NaN

这是 CTC 训练中最常见的问题。排查步骤:

  1. 检查 blank_index:确认词表中 blank 的索引与 `tf.nn.ctc_loss` 的约定一致。本实现中 blank = index 27(最后一个位置),是 TF 默认值,无需额外指定。
  2. 降低学习率:从 1e-4 甚至 1e-5 开始尝试。
  3. 检查输入长度:确保每条样本的 `feature_length > label_length`。
  4. 检查特征标准化:确保输入特征已做零均值单位方差标准化。
  5. 检查是否重复 Softmax:模型输出必须是 logits,不能手动加 Softmax。

10.2 模型只输出 blank

训练初期模型可能陷入”全输出 blank”的局部最优。这是因为 blank 是”安全”的输出——无论标签是什么,输出全 blank 也不会产生太大的 loss(只是不够好)。解决方法:

  1. 使用更小的初始学习率做预热(warmup)
  2. 确保训练数据的多样性(shuffle)
  3. 适当增加训练数据量

10.3 效果提升方向

如果基线效果不理想,可以从以下方向优化:

  1. 数据增强:速度扰动(Speed Perturbation)、SpecAugment(时间和频率掩码)、添加环境噪声
  2. 模型升级:将 LSTM 替换为 Conformer(卷积 + Transformer),这是当前语音识别的主流架构
  3. 更多训练数据:使用 LibriSpeech 960 小时或更大的数据集
  4. 外部语言模型:在 Beam Search 中融合 n-gram 或 Transformer 语言模型
  5. 学习率策略:使用 warmup + cosine annealing 等更精细的策略
  6. 混合精度训练:使用 FP16 加速训练并增大 batch size

11. 总结与展望

本篇作为 CTC 系列的第三部分,将 Part 1 和 Part 2 中的理论知识转化为了可运行的完整代码实现。我们从零开始搭建了一个基于 TensorFlow 的端到端 CTC 语音识别系统,覆盖了从数据预处理、模型构建、CTC Loss 训练到 Beam Search 解码和模型评估的完整流程。

回顾一下全系列的脉络:

Part 1 解决了”如何训练”的问题:定义了 CTC Loss 函数,利用 Forward-Backward 动态规划算法高效计算 Loss 及其梯度,实现端到端训练。对应到本篇代码中的 `compute_ctc_loss` 函数和 `train_step` 方法。

Part 2 解决了”如何解码”的问题:介绍了 Greedy Decoding 和 Beam Search Decoding 两种算法。对应到本篇代码中的 `ctc_greedy_decode` 和 `ctc_beam_search_decode_custom` 函数。

Part 3(本篇) 解决了”如何实现”的问题:将理论公式转化为工程代码,并提供了完整的训练与评估流水线。

尽管 CTC 算法已经提出近 20 年,它至今仍在语音识别领域发挥着重要作用。现代的语音识别系统(如 Whisper、Conformer-Transducer 等)虽然在架构上有所演进,但许多核心思想仍然可以追溯到 CTC——端到端训练的范式、序列级别的损失函数、动态规划的求解思路等。深入理解 CTC 算法,不仅有助于掌握语音识别的基础,也为理解更前沿的技术提供了坚实的理论基石。

希望通过这三篇文章,读者能够对 CTC 算法建立起从理论到实践的完整认知。

References

  1. Graves et al., Connectionist Temporal Classification: Labelling Unsegmented Sequence Data with RNNs. In ICML, 2006. (Graves提出CTC算法的原始论文)
  2. Graves et al., A Novel Connectionist System for Unconstrained Handwriting Recognition. In IEEE Transactions on PAML, 2009.(CTC算法在手写字识别中的应用)
  3. Graves et al., Towards End-to-End Recognition with RNNs. In JMLR, 2014.(CTC算法在端到端声学模型中的应用)
  4. Alex Graves, Supervised Sequence Labelling with Recurrent Neural Networks. In Studies in Computational Intelligence, Springer, 2012.( Graves 的博士论文,关于sequence learning的研究,主要是CTC)
  5. Watanabe et al., Hybrid CTC/Attention Architecture for End-to-End Speech Recognition. IEEE Journal of Selected Topics in Signal Processing, 2017.(CTC/Attention 联合解码的代表性工作)
  6. Miao et al., EESEN: End-to-End Speech Recognition using Deep RNN Models and WFST-Based Decoding. In ASRU, 2015.(基于 WFST 的 CTC 解码框架)
  7. TensorFlow CTC API 文档:https://www.tensorflow.org/api_docs/python/tf/nn/ctc_loss
  8. Librosa 文档:https://librosa.org/doc/latest/index.html