转载本文请注明出处:https://yudonglee.me/ctc-explained-part3 作者:yudonglee
本系列文章总共分为三部分来全面阐述CTC算法(本篇为Part 3):
Part 1:Training the Network(训练算法篇),介绍CTC理论原理,包括问题定义、公式推导、算法过程等。Part 1链接。
Part 2:Decoding the Network(解码算法篇),介绍CTC Decoding的几种常用算法。Part 2链接。
Part 3:CTC Demo by Speech Recognition(语音识别实战篇),基于 TensorFlow 实现完整的 CTC 语音识别系统,即本篇。
在前两篇文章中,我们分别从理论层面深入剖析了 CTC 的训练算法和解码算法。理论固然重要,但”纸上得来终觉浅,绝知此事要躬行”。本篇将带领读者从零开始,基于 TensorFlow 2.x 搭建一个完整的端到端语音识别系统,将 Part 1 和 Part 2 中的理论知识落地为可运行的代码。
本篇的内容组织如下:
- 开发环境搭建
- 数据集介绍与准备
- 音频特征提取(MFCC)
- 数据预处理 Pipeline
- 模型架构设计(BiLSTM + CTC)
- CTC Loss 函数与训练过程
- CTC Beam Search 解码实现
- 模型评估与指标计算(CER / WER)
- 完整训练流程与实验结果
- 常见问题与调优建议
- 总结与展望
接下来,让我们一步步开始搭建。
1. 开发环境搭建
1.1 环境要求
本实战项目基于以下环境:
- Python 3.8+
- TensorFlow 2.10+(包含 `tf.nn.ctc_loss` 等 CTC 相关 API)
- NumPy 1.21+
- Librosa 0.9+(音频处理与特征提取)
- SoundFile 0.10+(音频文件读取)
- Matplotlib 3.5+(可视化)
- tqdm(训练进度显示)
1.2 安装依赖
# 创建虚拟环境(推荐)
conda create -n ctc-asr python=3.8 -y
conda activate ctc-asr
# 安装核心依赖(TF 2.10+ 已统一 CPU/GPU 包,无需单独安装 tensorflow-gpu)
pip install tensorflow==2.10.0
pip install numpy librosa soundfile matplotlib tqdm
1.3 项目结构
ctc-speech-recognition/
├── data/
│ ├── raw/ # 原始音频和标注文件
│ ├── processed/ # 预处理后的特征文件
│ └── vocab.json # 字符集词表
├── src/
│ ├── config.py # 超参数配置
│ ├── build_vocab.py # 词表构建
│ ├── feature_extraction.py # 音频特征提取
│ ├── data_pipeline.py # 数据加载与预处理
│ ├── model.py # 模型定义
│ ├── train.py # 训练脚本
│ ├── decode.py # 解码模块
│ └── evaluate.py # 评估脚本
├── notebooks/
│ └── visualization.ipynb # 可视化分析
├── checkpoints/ # 模型检查点
├── logs/ # TensorBoard 日志
├── main.py # 主入口
└── README.md
我们先从配置文件开始,定义项目中用到的所有超参数:
# src/config.py
"""
CTC 语音识别系统 - 超参数配置
"""
class Config:
# ============ 音频特征参数 ============
sample_rate = 16000 # 采样率(Hz)
n_mfcc = 39 # MFCC 特征维度(13维MFCC + 13维一阶差分 + 13维二阶差分)
n_fft = 512 # FFT 窗口大小
hop_length = 160 # 帧移(10ms = 16000 * 0.01)
win_length = 400 # 帧长(25ms = 16000 * 0.025)
n_mels = 80 # Mel 滤波器组数量
# ============ 模型参数 ============
rnn_units = 256 # LSTM 隐藏层单元数
num_rnn_layers = 3 # LSTM 层数
dropout_rate = 0.3 # Dropout 比例
use_bidirectional = True # 是否使用双向 LSTM
# ============ 训练参数 ============
batch_size = 32 # 批大小
epochs = 100 # 最大训练轮数
learning_rate = 1e-3 # 初始学习率
lr_decay_factor = 0.5 # 学习率衰减因子
lr_decay_patience = 5 # 学习率衰减耐心值
max_audio_length = 16.0 # 最大音频长度(秒)
clip_grad_norm = 5.0 # 梯度裁剪阈值
# ============ 解码参数 ============
beam_width = 25 # Beam Search 束宽
lm_weight = 0.0 # 语言模型权重(本篇暂不使用外部LM)
length_bonus = 0.0 # 长度奖励系数
# ============ 路径配置 ============
data_dir = "data/"
checkpoint_dir = "checkpoints/"
log_dir = "logs/"
# ============ 字符集参数 ============
# blank 放在最后一个索引(index 27),这是 TensorFlow ctc_loss 的默认约定
vocab_size = 28 # 26字母 + 1空格 + 1blank
blank_index = 27 # blank 在字符集中的索引
这里有几个参数值得特别说明:
- n_mfcc = 39:这是语音识别中最经典的特征维度配置。13 维 MFCC 基础系数 + 13 维一阶差分(delta)+ 13 维二阶差分(delta-delta),共 39 维。一阶差分捕捉特征的变化趋势,二阶差分捕捉变化的加速度,这些动态特征对识别准确率至关重要。
- hop_length = 160:对应 10ms 的帧移(16000 Hz × 0.01s = 160 个采样点),这是语音识别中的标准设定。
- clip_grad_norm = 5.0:CTC 训练初期梯度容易爆炸,梯度裁剪是必不可少的稳定手段。
- blank_index = 27:将 blank 放在字符集的最后一个索引位置。这是 TensorFlow `tf.nn.ctc_loss` 的默认行为,避免了需要额外指定 `blank_index` 参数带来的兼容性问题。
2. 数据集介绍与准备
2.1 数据集选择
本实战选用 LibriSpeech 数据集作为示例,同时简要介绍 TIMIT 数据集供读者参考。
TIMIT(Texas Instruments / MIT):
- 经典的英文语音识别基准数据集
- 包含 6300 条语音,共约 5.4 小时
- 630 位说话人,每人 10 句
- 提供音素(phoneme)级别的时间对齐标注
- 数据量小,适合快速实验和原型验证
LibriSpeech:
- 大规模英文有声书语音数据集
- train-clean-100:约 100 小时干净语音
- train-clean-360:约 360 小时干净语音
- 提供文本转录,不提供时间对齐
- 数据量大,适合训练更好的模型
本篇以 LibriSpeech train-clean-100 为主要示例,因为它无需时间对齐标注,与 CTC 的端到端训练范式完美契合。
2.2 下载数据集
# 下载 LibriSpeech train-clean-100(约 6.3GB)
wget https://www.openslr.org/resources/12/train-clean-100.tar.gz
tar -xzf train-clean-100.tar.gz -C data/raw/
# 下载测试集 test-clean(约 346MB)
wget https://www.openslr.org/resources/12/test-clean.tar.gz
tar -xzf test-clean.tar.gz -C data/raw/
2.3 数据集结构解析
LibriSpeech 的目录结构如下:
data/raw/LibriSpeech/
├── train-clean-100/
│ ├── 19/ # 说话人 ID
│ │ ├── 198/ # 章节 ID
│ │ │ ├── 19-198.trans.txt # 该章节的文本转录
│ │ │ ├── 19-198-0000.flac # 音频文件 (FLAC格式)
│ │ │ ├── 19-198-0001.flac
│ │ │ └── ...
│ │ └── ...
│ └── ...
└── test-clean/
└── ...
其中 `.trans.txt` 文件的格式为:
19-198-0000 NORTHANGER ABBEY
19-198-0001 THIS LITTLE WORK WAS FINISHED IN THE YEAR ...
每行的格式是 `{说话人ID}-{章节ID}-{句子ID} {转录文本}`。
2.4 构建字符集词表
CTC 模型的输出单元是字符级别的。我们需要构建一个字符集词表(vocabulary),将每个字符映射为一个整数索引。
# src/build_vocab.py
"""
构建字符集词表
CTC 的输出字符集 = 26个英文字母 + 空格 + blank
其中 blank 是 CTC 算法引入的特殊符号(详见 Part 1 第4节),
用于表示"当前时间步无输出",在映射函数 B 中会被移除。
"""
import json
import os
from src.config import Config
def build_vocabulary():
"""
构建字符到索引的映射表。
索引分配规则:
- index 0: 空格 '<space>'
- index 1-26: 字母 a-z
- index 27: CTC blank 符号
重要:blank 放在最后一个索引(27),这是 TensorFlow tf.nn.ctc_loss
的默认约定。如果 blank 放在 index 0,则需要显式传入 blank_index=0,
且不同 TF 版本的兼容性可能存在问题。将 blank 放在最后可以避免这类隐患。
"""
vocab = {
'<space>': 0, # 空格
}
# 26个英文小写字母,index 1-26
for i, char in enumerate('abcdefghijklmnopqrstuvwxyz'):
vocab[char] = i + 1
# blank 放在最后,index 27
vocab['<blank>'] = Config.blank_index # 27
# 反向映射:索引 -> 字符
idx_to_char = {}
for k, v in vocab.items():
if k == '<space>':
idx_to_char[v] = ' '
elif k == '<blank>':
idx_to_char[v] = '' # blank 映射为空字符串(解码时跳过)
else:
idx_to_char[v] = k
print(f"字符集大小: {len(vocab)}")
print(f"字符集: {list(vocab.keys())}")
print(f"blank 索引: {Config.blank_index}")
# 保存词表
os.makedirs("data/", exist_ok=True)
with open("data/vocab.json", "w") as f:
json.dump({
"char_to_idx": vocab,
"idx_to_char": {str(k): v for k, v in idx_to_char.items()},
"vocab_size": len(vocab), # 28
"blank_index": Config.blank_index # 27
}, f, indent=2, ensure_ascii=False)
return vocab, idx_to_char
if __name__ == "__main__":
vocab, idx_to_char = build_vocabulary()
# 输出:
# 字符集大小: 28
# 字符集: ['<space>', 'a', 'b', ..., 'z', '<blank>']
# blank 索引: 27
这里的字符集设计有几点值得说明:
- 为什么用字符级别而非词级别? CTC 要求输入序列长度 >= 输出序列长度。如果使用词级别(词汇量可能上万),输出序列会很短,但每个位置的分类空间极大,模型难以学习。字符级别的输出序列虽然更长,但分类空间只有 28 个类别,学习难度大幅降低。
- blank 符号为什么放在最后一个索引?TensorFlow 的 `tf.nn.ctc_loss` 默认将最后一个索引视为 blank。将 blank 放在 index 27(最后一个位置),可以直接使用默认参数,无需额外指定 `blank_index`,兼容性最好。
- 没有使用标点符号:LibriSpeech 的转录文本都是大写字母且没有标点,我们统一转为小写后,字符集非常简洁。
3. 音频特征提取(MFCC)
3.1 MFCC 特征简介
MFCC(Mel-Frequency Cepstral Coefficients,梅尔频率倒谱系数)是语音识别中最经典、最广泛使用的声学特征。其计算流程如下:
原始音频波形
↓ 预加重(Pre-emphasis)
↓ 分帧(Framing):每帧 25ms,帧移 10ms
↓ 加窗(Windowing):汉明窗
↓ 快速傅里叶变换(FFT)
↓ Mel 滤波器组(Mel Filter Bank)
↓ 取对数(Log)
↓ 离散余弦变换(DCT)
↓
MFCC 特征(13维)
↓ 一阶差分(Delta)→ 13维
↓ 二阶差分(Delta-Delta)→ 13维
↓
最终特征(39维)
MFCC 的设计模拟了人耳的听觉特性——Mel 滤波器组在低频区分辨率高、高频区分辨率低,这与人耳对低频更敏感的生理特性一致。
3.2 特征提取代码
# src/feature_extraction.py
"""
音频特征提取模块
将原始音频波形转换为 MFCC 特征序列,供模型输入使用。
每一帧(10ms)的音频生成一个 39 维的特征向量,
因此一段 T 秒的音频会生成约 T × 100 帧的特征序列。
回顾 Part 1 第3节中的定义:
- 输入序列 x = (x_1, x_2, ..., x_T),其中每个 x_t 是一个 39 维向量
- T = 音频时长(秒)× 100(因为帧移为 10ms)
"""
import numpy as np
import librosa
import soundfile as sf
from src.config import Config
def load_audio(audio_path, target_sr=Config.sample_rate):
"""
加载音频文件并重采样到目标采样率。
Args:
audio_path: 音频文件路径(支持 .wav, .flac 等格式)
target_sr: 目标采样率,默认 16000 Hz
Returns:
audio: 一维 numpy 数组,归一化到 [-1, 1]
sr: 采样率
"""
audio, sr = sf.read(audio_path)
# 如果是多声道,取第一个声道
if len(audio.shape) > 1:
audio = audio[:, 0]
# 重采样
if sr != target_sr:
audio = librosa.resample(audio, orig_sr=sr, target_sr=target_sr)
sr = target_sr
# 归一化
max_val = np.max(np.abs(audio))
if max_val > 0:
audio = audio / max_val
return audio, sr
def extract_mfcc(audio, sr=Config.sample_rate):
"""
从音频波形中提取 MFCC 特征。
提取过程:
1. 计算 13 维 MFCC 基础系数
2. 计算一阶差分(delta),捕捉特征随时间的变化速率
3. 计算二阶差分(delta-delta),捕捉变化的加速度
4. 拼接三者得到 39 维特征向量
Args:
audio: 一维音频波形数组
sr: 采样率
Returns:
features: numpy 数组,shape = (time_steps, 39)
其中 time_steps ≈ 音频时长(秒) × 100
"""
# Step 1: 计算 13 维 MFCC
mfcc = librosa.feature.mfcc(
y=audio,
sr=sr,
n_mfcc=13, # 取前 13 个 MFCC 系数
n_fft=Config.n_fft, # FFT 窗口大小
hop_length=Config.hop_length, # 帧移:160个采样点 = 10ms
win_length=Config.win_length, # 帧长:400个采样点 = 25ms
n_mels=Config.n_mels # Mel 滤波器数量
)
# mfcc.shape = (13, time_steps)
# Step 2: 计算一阶差分(Delta)
# Delta 特征反映了 MFCC 在时间维度上的变化趋势
delta = librosa.feature.delta(mfcc, order=1)
# Step 3: 计算二阶差分(Delta-Delta)
# 反映变化的加速度,对于捕捉语音的动态特性很重要
delta2 = librosa.feature.delta(mfcc, order=2)
# Step 4: 拼接三者
features = np.concatenate([mfcc, delta, delta2], axis=0)
# features.shape = (39, time_steps)
# 转置为 (time_steps, 39),符合 RNN 输入格式:(序列长度, 特征维度)
features = features.T
# Step 5: 特征标准化(零均值,单位方差)
# 这一步对模型训练非常重要,可以加速收敛并提升最终效果
mean = np.mean(features, axis=0)
std = np.std(features, axis=0)
features = (features - mean) / (std + 1e-8) # 加 epsilon 防止除零
return features.astype(np.float32)
def extract_features_from_file(audio_path):
"""
从音频文件直接提取特征的便捷函数。
Args:
audio_path: 音频文件路径
Returns:
features: numpy 数组,shape = (time_steps, 39)
"""
audio, sr = load_audio(audio_path)
# 限制最大音频长度,避免内存溢出
max_samples = int(Config.max_audio_length * sr)
if len(audio) > max_samples:
audio = audio[:max_samples]
features = extract_mfcc(audio, sr)
return features
# ============ 可视化辅助函数 ============
def plot_features(audio_path, save_path=None):
"""
可视化音频波形和 MFCC 特征,帮助直观理解特征提取过程。
"""
import matplotlib.pyplot as plt
audio, sr = load_audio(audio_path)
features = extract_mfcc(audio, sr)
fig, axes = plt.subplots(3, 1, figsize=(14, 10))
# 1. 原始波形
time_axis = np.arange(len(audio)) / sr
axes[0].plot(time_axis, audio, linewidth=0.5)
axes[0].set_title("Raw Waveform")
axes[0].set_xlabel("Time (s)")
axes[0].set_ylabel("Amplitude")
# 2. MFCC 特征(前13维)
axes[1].imshow(features[:, :13].T, aspect='auto', origin='lower', cmap='viridis')
axes[1].set_title("MFCC Features (13-dim)")
axes[1].set_xlabel("Frame Index")
axes[1].set_ylabel("MFCC Coefficient")
# 3. 完整 39 维特征
axes[2].imshow(features.T, aspect='auto', origin='lower', cmap='viridis')
axes[2].set_title("Full Features (39-dim: MFCC + Delta + Delta-Delta)")
axes[2].set_xlabel("Frame Index")
axes[2].set_ylabel("Feature Dimension")
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=150)
plt.show()
print(f"音频时长: {len(audio)/sr:.2f}s")
print(f"特征维度: {features.shape}")
print(f"帧数 (time_steps): {features.shape[0]}")
3.3 特征提取效果验证
# 快速验证
features = extract_features_from_file("data/raw/LibriSpeech/train-clean-100/19/198/19-198-0000.flac")
print(f"特征形状: {features.shape}")
# 输出示例: 特征形状: (547, 39)
# 说明这段音频生成了 547 帧,每帧 39 维特征
# 可视化
plot_features("data/raw/LibriSpeech/train-clean-100/19/198/19-198-0000.flac")
4. 数据预处理 Pipeline
4.1 数据解析与文本编码
# src/data_pipeline.py
"""
数据加载与预处理 Pipeline
负责:
1. 解析 LibriSpeech 的目录结构和标注文件
2. 将文本转录转为整数索引序列
3. 构建 tf.data.Dataset,实现高效的数据加载与动态 padding
"""
import os
import json
import numpy as np
import tensorflow as tf
from src.config import Config
from src.feature_extraction import extract_features_from_file
def parse_librispeech(data_dir, split="train-clean-100"):
"""
解析 LibriSpeech 数据集,返回 (音频路径, 转录文本) 列表。
Args:
data_dir: 数据根目录
split: 数据集分割名称
Returns:
samples: [(audio_path, transcript), ...] 的列表
"""
split_dir = os.path.join(data_dir, "raw", "LibriSpeech", split)
samples = []
for speaker_id in sorted(os.listdir(split_dir)):
speaker_dir = os.path.join(split_dir, speaker_id)
if not os.path.isdir(speaker_dir):
continue
for chapter_id in sorted(os.listdir(speaker_dir)):
chapter_dir = os.path.join(speaker_dir, chapter_id)
if not os.path.isdir(chapter_dir):
continue
# 读取转录文件
trans_file = os.path.join(
chapter_dir, f"{speaker_id}-{chapter_id}.trans.txt"
)
if not os.path.exists(trans_file):
continue
with open(trans_file, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
# 格式: "19-198-0000 NORTHANGER ABBEY"
parts = line.split(" ", 1)
if len(parts) < 2:
continue
utt_id = parts[0]
transcript = parts[1].lower() # 统一转为小写
audio_path = os.path.join(chapter_dir, f"{utt_id}.flac")
if os.path.exists(audio_path):
samples.append((audio_path, transcript))
print(f"[{split}] 共解析到 {len(samples)} 条样本")
return samples
def text_to_indices(text, char_to_idx):
"""
将文本转录转为整数索引序列。
回顾 Part 1 第3节:
- 输出标签 z = (z_1, z_2, ..., z_U)
- 每个 z_u 是字符集中的一个字符
- 这里我们将字符映射为整数索引,方便模型处理
例如:
text = "hello world"
输出 = [8, 5, 12, 12, 15, 0, 23, 15, 18, 12, 4]
其中 0 代表空格,其他数字代表对应的字母
"""
indices = []
for char in text:
if char == ' ':
indices.append(char_to_idx['<space>'])
elif char in char_to_idx:
indices.append(char_to_idx[char])
# 忽略不在字符集中的字符(如标点)
return indices
def indices_to_text(indices, idx_to_char):
"""
将整数索引序列转回文本(解码时使用)。
注意:
- blank 索引(27)会被映射为空字符串(跳过)
- 空格索引(0)会被映射为空格
"""
chars = []
for idx in indices:
idx_key = int(idx)
char = idx_to_char.get(idx_key, '')
if char: # 跳过 blank(映射为空字符串)
chars.append(char)
return ''.join(chars)
4.2 构建 tf.data.Dataset
tf.data.Dataset 是 TensorFlow 推荐的数据加载方式,它支持数据的惰性加载、预取(prefetch)、动态填充(padded_batch)等高效操作。这在处理变长音频数据时尤为重要。
# src/data_pipeline.py(续)
def create_dataset(samples, char_to_idx, is_training=True):
"""
构建 tf.data.Dataset。
CTC 训练需要以下四个输入:
1. features: 音频特征序列,shape = (batch, max_time, 39)
2. labels: 文本索引序列,shape = (batch, max_label_len)
3. feature_lengths: 每条样本的实际特征帧数,shape = (batch,)
4. label_lengths: 每条样本的实际标签长度,shape = (batch,)
其中 feature_lengths 和 label_lengths 对于 CTC Loss 的计算至关重要——
它们告诉 CTC 算法每条样本的有效序列长度是多少,
padding 部分不应参与 Loss 计算。
"""
def _process_sample(audio_path, transcript):
"""处理单条样本:提取特征 + 文本编码"""
# 提取 MFCC 特征
features = extract_features_from_file(audio_path.numpy().decode('utf-8'))
# 文本转索引
text = transcript.numpy().decode('utf-8')
label = text_to_indices(text, char_to_idx)
label = np.array(label, dtype=np.int32)
feature_length = np.int32(features.shape[0])
label_length = np.int32(len(label))
return features, label, feature_length, label_length
def tf_process_sample(audio_path, transcript):
"""tf.py_function 封装,使 numpy 操作能在 tf.data 管线中运行"""
features, label, feat_len, label_len = tf.py_function(
func=_process_sample,
inp=[audio_path, transcript],
Tout=[tf.float32, tf.int32, tf.int32, tf.int32]
)
# 设置形状信息(tf.py_function 会丢失形状信息)
features.set_shape([None, Config.n_mfcc])
label.set_shape([None])
feat_len.set_shape([])
label_len.set_shape([])
return features, label, feat_len, label_len
# 构建数据集
audio_paths = [s[0] for s in samples]
transcripts = [s[1] for s in samples]
dataset = tf.data.Dataset.from_tensor_slices((audio_paths, transcripts))
if is_training:
dataset = dataset.shuffle(buffer_size=min(len(samples), 10000))
dataset = dataset.map(
tf_process_sample,
num_parallel_calls=tf.data.AUTOTUNE # 并行处理加速
)
# 过滤掉异常样本(特征帧数必须 > 标签长度,这是 CTC 的基本约束)
dataset = dataset.filter(
lambda feat, lab, fl, ll: tf.greater(fl, ll)
)
# 动态 padding + 分批
# padding_values 中标签用 blank_index 填充(不会影响 CTC Loss 计算,
# 因为 label_lengths 已经告诉了 CTC 有效标签的长度)
dataset = dataset.padded_batch(
batch_size=Config.batch_size,
padded_shapes=(
[None, Config.n_mfcc], # features: 填充时间维度
[None], # labels: 填充到最长标签
[], # feature_length: 标量,不填充
[] # label_length: 标量,不填充
),
padding_values=(
0.0, # 特征用 0.0 填充
np.int32(Config.blank_index), # 标签用 blank 索引填充
np.int32(0), # feature_length
np.int32(0) # label_length
),
drop_remainder=is_training # 训练时丢弃不完整的最后一个 batch
)
dataset = dataset.prefetch(tf.data.AUTOTUNE) # 预取加速
return dataset
这段代码中有几个关键的设计决策:
- filter 操作:`fl > ll` 确保输入序列长度大于输出序列长度。回顾 Part 1 第 3 节,这是 CTC 的基本约束条件——如果输入帧数不够标签字符数多,CTC 无法为每个字符分配至少一个时间步。
- padded_batch:这是处理变长序列的标准做法。不同音频的特征帧数不同,需要 padding 到同一长度才能组成 batch。但 padding 部分不参与 CTC Loss 的计算,这就是为什么我们需要传入 `feature_lengths` 和 `label_lengths`。
- prefetch:在 GPU 训练当前 batch 的同时,CPU 预取下一个 batch 的数据,实现流水线并行,大幅提升训练吞吐量。
5. 模型架构设计(BiLSTM + CTC)
5.1 架构概览
我们采用经典的 BiLSTM + FC + CTC 架构。这也是 Alex Graves 在 CTC 原始论文及后续工作中使用的架构。
输入特征: (batch, T, 39)
↓
[BiLSTM Layer 1] → (batch, T, 512) # 256 forward + 256 backward
↓ Dropout
[BiLSTM Layer 2] → (batch, T, 512)
↓ Dropout
[BiLSTM Layer 3] → (batch, T, 512)
↓ Dropout
[Dense Layer] → (batch, T, 28) # 投影到字符集大小
↓
[CTC Loss / CTC Decode] # 注意:这里没有 Softmax 层!
回顾 Part 1 第 4 节和第 5 节:
- RNN(这里是 BiLSTM)在每个时间步 t 输出一个概率向量 y_t,维度等于扩展字符集大小(26 字母 + 空格 + blank = 28)
- y_t^k 表示时间步 t 输出字符 k 的概率
- CTC Loss 通过累加所有合法路径的概率来计算
5.2 为什么选择双向 LSTM?
双向 LSTM(BiLSTM)包含前向和后向两个方向的 LSTM,能够同时利用过去和未来的上下文信息。在语音识别中,一个音素的发音往往受到前后音素的影响(协同发音现象),因此双向结构能显著提升识别效果。
在非流式(offline)语音识别场景中,整条音频是可以提前获取的,因此使用双向结构没有问题。但在流式(streaming / online)场景中,只能使用单向 LSTM 或带有限前看(limited lookahead)的结构。
5.3 模型代码
# src/model.py
"""
CTC 语音识别模型定义
架构:多层 BiLSTM + 全连接层
回顾 Part 1 第 5 节中的模型结构图:
- RNN 部分:本实现使用 3 层 BiLSTM(比原论文中的单层更深)
- 输出层:Dense(vocab_size),输出 logits(不加 Softmax!)
- 训练时:CTC Loss 内部自行做 Softmax
- 解码时:CTC Decode 内部自行做 Softmax
"""
import tensorflow as tf
from src.config import Config
def build_ctc_model(vocab_size=Config.vocab_size, input_dim=Config.n_mfcc):
"""
构建 CTC 语音识别模型。
Args:
vocab_size: 字符集大小(包括 blank),本实验为 28
input_dim: 输入特征维度,默认 39
Returns:
model: tf.keras.Model 实例
"""
# ============ 输入层 ============
input_features = tf.keras.Input(
shape=(None, input_dim),
dtype=tf.float32,
name="input_features"
)
x = input_features
# ============ BiLSTM 层 ============
for i in range(Config.num_rnn_layers):
# return_sequences=True: 返回每个时间步的输出,而非仅最后一步
# 这对 CTC 至关重要——CTC 需要每个时间步都有输出!
#
# 注意:LSTM 的 dropout 参数只在 training=True 时生效,
# 这里不额外加 Dropout 层,避免双重 dropout。
lstm = tf.keras.layers.LSTM(
units=Config.rnn_units,
return_sequences=True,
dropout=Config.dropout_rate, # 输入到隐藏层的 dropout
recurrent_dropout=0, # 设为 0 以兼容 cuDNN 加速
name=f"lstm_{i}"
)
if Config.use_bidirectional:
x = tf.keras.layers.Bidirectional(
lstm,
merge_mode='concat', # 前向和后向的输出拼接
name=f"bilstm_{i}"
)(x)
# 拼接后维度: (batch, time_steps, rnn_units * 2) = (batch, T, 512)
else:
x = lstm(x)
# 维度: (batch, time_steps, rnn_units) = (batch, T, 256)
# ============ 输出层 ============
# 全连接层:将 LSTM 输出投影到字符集大小
# 注意:输出 logits(无激活函数),不加 Softmax!
# tf.nn.ctc_loss 和 tf.nn.ctc_beam_search_decoder 都接受 logits 输入,
# 内部自行做 log_softmax / softmax。
# 如果这里多加一层 Softmax,等于做了两次 Softmax,训练将完全不收敛!
logits = tf.keras.layers.Dense(
vocab_size,
activation=None, # 输出 logits
name="output_dense"
)(x)
# logits.shape = (batch, time_steps, vocab_size)
model = tf.keras.Model(
inputs=input_features,
outputs=logits,
name="ctc_speech_recognition"
)
return model
# ============ 快速测试 ============
if __name__ == "__main__":
model = build_ctc_model()
model.summary()
# 模拟输入测试
dummy_input = tf.random.normal((2, 300, 39)) # batch=2, 300帧, 39维
output = model(dummy_input, training=False)
print(f"\n输入形状: {dummy_input.shape}")
print(f"输出形状: {output.shape}")
# 预期输出: (2, 300, 28)
# 解释: 每个时间步输出一个 28 维的 logits 向量,
# 对应 28 个字符(含 blank)的未归一化分数
5.4 关于模型输出 logits 的重要说明
模型输出的是 **logits**(未经 Softmax 的原始分数),而不是概率。这一点极其重要:
- 训练时:`tf.nn.ctc_loss` 内部会自动对 logits 做 log_softmax,然后计算 CTC Loss。
- 解码时:`tf.nn.ctc_beam_search_decoder` 同样接受 logits 输入,内部处理 softmax。
- 常见 bug:如果在模型中额外加了 Softmax 层,等于对概率做了两次 Softmax,训练将完全不收敛,但不会报任何错误。这是 CTC 实现中最隐蔽的陷阱之一。
6. CTC Loss 函数与训练过程
6.1 CTC Loss 的调用
TensorFlow 已经内置了 CTC Loss 的高效实现(基于 Part 1 第 6 节中介绍的 Forward-Backward 动态规划算法),我们可以直接调用:
# src/train.py
"""
CTC 模型训练模块
核心流程:
1. 模型前向传播 → 得到 logits
2. 计算 CTC Loss(TF 内部使用 Forward-Backward 算法)
3. 反向传播 → 计算梯度(对应 Part 1 第 7 节的 CTC Loss 求导)
4. 梯度裁剪 → 更新参数
"""
import os
import time
import numpy as np
import tensorflow as tf
from tqdm import tqdm
from src.config import Config
from src.model import build_ctc_model
from src.data_pipeline import parse_librispeech, create_dataset, indices_to_text
from src.decode import ctc_greedy_decode
from src.evaluate import compute_cer, compute_wer
def compute_ctc_loss(logits, labels, logit_lengths, label_lengths):
"""
计算 CTC Loss。
这里直接调用 TensorFlow 的 tf.nn.ctc_loss,其内部实现了
Part 1 第 6 节中的 Forward-Backward 动态规划算法:
1. 对 logits 做 log_softmax 得到每个时间步的对数概率分布
2. 构建扩展标签序列 l'(插入 blank)
3. 前向传播:计算前向概率 alpha(s, t)
4. 后向传播:计算后向概率 beta(s, t)
5. 综合前向和后向概率计算 p(z|x)
6. 取负对数得到 CTC Loss
Args:
logits: 模型输出,shape = (batch, max_time, vocab_size)
labels: 标签索引,shape = (batch, max_label_len)
logit_lengths: 每条样本的有效帧数,shape = (batch,)
label_lengths: 每条样本的有效标签长度,shape = (batch,)
Returns:
loss: 标量,batch 内所有样本的平均 CTC Loss
"""
# tf.nn.ctc_loss 要求 logits 的时间维度在第一维(当 logits_time_major=True 时)
# 转置: (batch, time, vocab) -> (time, batch, vocab)
logits_transposed = tf.transpose(logits, perm=[1, 0, 2])
# 调用 CTC Loss
# blank_index 默认为 vocab_size - 1(即最后一个索引 = 27),
# 与我们的词表设计一致,因此无需额外指定。
loss = tf.nn.ctc_loss(
labels=labels,
logits=logits_transposed,
label_length=label_lengths,
logit_length=logit_lengths,
logits_time_major=True, # logits 的第一维是时间
)
# loss.shape = (batch,),每条样本一个 loss 值
# 返回 batch 平均 loss
return tf.reduce_mean(loss)
6.2 训练循环
# src/train.py(续)
class CTCTrainer:
"""CTC 模型训练器"""
def __init__(self, vocab_size=Config.vocab_size):
self.model = build_ctc_model(vocab_size)
self.optimizer = tf.keras.optimizers.Adam(
learning_rate=Config.learning_rate
)
self.vocab_size = vocab_size
# TensorBoard 日志
os.makedirs(Config.log_dir, exist_ok=True)
self.train_summary_writer = tf.summary.create_file_writer(
os.path.join(Config.log_dir, "train")
)
self.val_summary_writer = tf.summary.create_file_writer(
os.path.join(Config.log_dir, "val")
)
@tf.function(
input_signature=[
tf.TensorSpec(shape=[None, None, Config.n_mfcc], dtype=tf.float32),
tf.TensorSpec(shape=[None, None], dtype=tf.int32),
tf.TensorSpec(shape=[None], dtype=tf.int32),
tf.TensorSpec(shape=[None], dtype=tf.int32),
]
)
def train_step(self, features, labels, feature_lengths, label_lengths):
"""
单步训练(使用 @tf.function 编译为计算图加速)。
对应 Part 1 第 7 节的完整流程:
1. 前向传播 → logits
2. CTC Loss 计算(Forward-Backward 算法)
3. 反向传播 → 梯度计算
4. 梯度裁剪 → 防止梯度爆炸
5. 参数更新
"""
with tf.GradientTape() as tape:
# Step 1: 前向传播
logits = self.model(features, training=True)
# Step 2: 计算 CTC Loss
loss = compute_ctc_loss(
logits, labels, feature_lengths, label_lengths
)
# Step 3: 计算梯度
gradients = tape.gradient(loss, self.model.trainable_variables)
# Step 4: 梯度裁剪
# CTC 训练初期梯度非常不稳定,裁剪是必需的
gradients, grad_norm = tf.clip_by_global_norm(
gradients, Config.clip_grad_norm
)
# Step 5: 更新参数
self.optimizer.apply_gradients(
zip(gradients, self.model.trainable_variables)
)
return loss, grad_norm
@tf.function(
input_signature=[
tf.TensorSpec(shape=[None, None, Config.n_mfcc], dtype=tf.float32),
tf.TensorSpec(shape=[None, None], dtype=tf.int32),
tf.TensorSpec(shape=[None], dtype=tf.int32),
tf.TensorSpec(shape=[None], dtype=tf.int32),
]
)
def val_step(self, features, labels, feature_lengths, label_lengths):
"""验证步骤(不计算梯度,不更新参数)"""
logits = self.model(features, training=False)
loss = compute_ctc_loss(
logits, labels, feature_lengths, label_lengths
)
return loss
def train(self, train_dataset, val_dataset, idx_to_char):
"""
完整训练流程。
Args:
train_dataset: 训练集 tf.data.Dataset
val_dataset: 验证集 tf.data.Dataset
idx_to_char: 索引到字符的映射表
"""
best_val_loss = float('inf')
no_improve_count = 0
for epoch in range(Config.epochs):
start_time = time.time()
# ======== 训练阶段 ========
train_losses = []
for batch in tqdm(train_dataset, desc=f"Epoch {epoch+1}/{Config.epochs} [Train]"):
features, labels, feat_lens, label_lens = batch
loss, grad_norm = self.train_step(
features, labels, feat_lens, label_lens
)
train_losses.append(loss.numpy())
avg_train_loss = np.mean(train_losses)
# ======== 验证阶段 ========
val_losses = []
for batch in tqdm(val_dataset, desc=f"Epoch {epoch+1}/{Config.epochs} [Val]"):
features, labels, feat_lens, label_lens = batch
loss = self.val_step(
features, labels, feat_lens, label_lens
)
val_losses.append(loss.numpy())
avg_val_loss = np.mean(val_losses)
# ======== 日志记录 ========
elapsed = time.time() - start_time
current_lr = self.optimizer.learning_rate.numpy()
print(f"\nEpoch {epoch+1}/{Config.epochs} - "
f"Train Loss: {avg_train_loss:.4f} - "
f"Val Loss: {avg_val_loss:.4f} - "
f"LR: {current_lr:.6f} - "
f"Time: {elapsed:.1f}s")
# TensorBoard 日志
with self.train_summary_writer.as_default():
tf.summary.scalar('loss', avg_train_loss, step=epoch)
with self.val_summary_writer.as_default():
tf.summary.scalar('loss', avg_val_loss, step=epoch)
# ======== 模型保存与学习率衰减 ========
if avg_val_loss < best_val_loss:
best_val_loss = avg_val_loss
no_improve_count = 0
self.save_checkpoint(epoch, avg_val_loss)
print(f" -> 保存最优模型 (val_loss: {avg_val_loss:.4f})")
else:
no_improve_count += 1
print(f" -> 验证集 loss 未改善 ({no_improve_count}/{Config.lr_decay_patience})")
# 学习率衰减
if no_improve_count >= Config.lr_decay_patience:
old_lr = current_lr
new_lr = max(old_lr * Config.lr_decay_factor, 1e-6)
self.optimizer.learning_rate.assign(new_lr)
no_improve_count = 0
print(f" -> 学习率衰减: {old_lr:.6f} -> {new_lr:.6f}")
# ======== 每 5 个 epoch 做一次解码示例 ========
if (epoch + 1) % 5 == 0:
self._decode_samples(val_dataset, idx_to_char, num_samples=3)
# ======== Early Stopping ========
if current_lr <= 1e-6 and no_improve_count >= Config.lr_decay_patience:
print(f"\nEarly stopping at epoch {epoch+1}")
break
print(f"\n训练完成!最优验证集 Loss: {best_val_loss:.4f}")
def _decode_samples(self, dataset, idx_to_char, num_samples=3):
"""解码几条验证样本,直观观察模型效果"""
print("\n ---- 解码示例 ----")
for batch in dataset.take(1):
features, labels, feat_lens, label_lens = batch
logits = self.model(features, training=False)
for j in range(min(num_samples, features.shape[0])):
# Greedy Decode
pred_list = ctc_greedy_decode(
logits[j:j+1], feat_lens[j:j+1], idx_to_char
)
# 真实标签
true_label = indices_to_text(
labels[j][:label_lens[j]].numpy().tolist(), idx_to_char
)
print(f" [样本 {j+1}]")
print(f" 真实: {true_label}")
print(f" 预测: {pred_list[0]}")
print(" ------------------\n")
def save_checkpoint(self, epoch, val_loss):
"""保存模型检查点"""
os.makedirs(Config.checkpoint_dir, exist_ok=True)
path = os.path.join(
Config.checkpoint_dir,
f"model_epoch{epoch+1}_loss{val_loss:.4f}.h5"
)
self.model.save_weights(path)
def load_checkpoint(self, path):
"""加载模型检查点"""
self.model.load_weights(path)
print(f"已加载模型: {path}")
6.3 关于 CTC 训练的几个重要注意事项
- 梯度裁剪(Gradient Clipping)是必需的:CTC 训练初期,由于前向概率和后向概率涉及大量概率值的乘积,数值容易变得极端(非常大或非常小),导致梯度爆炸。`clip_by_global_norm` 将梯度向量的全局范数限制在指定阈值内,是确保训练稳定的关键手段。
- 学习率不宜过大:与常规分类任务不同,CTC 的 loss landscape 更加复杂。建议从 1e-3 或 1e-4 开始,配合学习率衰减策略。如果训练初期 loss 为 NaN 或 Inf,通常是学习率过大导致的。
- blank_index 必须与词表一致:本实现中 blank 位于 index 27(字符集的最后一个位置),这与 TensorFlow `tf.nn.ctc_loss` 的默认行为一致(默认将 `num_classes – 1` 视为 blank)。如果你的词表将 blank 放在 index 0,则必须在 `tf.nn.ctc_loss` 中显式传入 `blank_index=0`,否则训练会完全不收敛但不报任何错误——这是一个极其隐蔽的 bug。
- 输入长度必须大于输出长度:这是 CTC 的硬约束。在 data_pipeline 中我们已经用 `filter` 过滤掉了不满足条件的样本,但在训练中仍需注意:如果某条音频特别短而标签特别长,`tf.nn.ctc_loss` 会返回 Inf 或 NaN。
7. CTC 解码实现
7.1 Greedy Decoding(贪心解码)
回顾 Part 2 中的分析,贪心解码在每个时间步选择概率最大的字符,然后通过映射函数 B(去重 + 去 blank)得到最终标签。虽然不是最优解,但实现简单,常用于训练过程中的快速效果检验。
# src/decode.py
"""
CTC 解码模块
实现了三种解码算法:
1. Greedy Decoding(贪心解码)- 对应 Part 2 中的 Best Path Decoding
2. TensorFlow 内置 Beam Search
3. 自定义 Beam Search(带更详细的过程输出)
"""
import numpy as np
import tensorflow as tf
from src.config import Config
def ctc_greedy_decode(logits, logit_lengths, idx_to_char):
"""
CTC 贪心解码。
对应 Part 2 中介绍的 Best Path Decoding:
1. 在每个时间步选择概率最大的字符 → 得到一条路径 pi
2. 通过映射函数 B 对路径做去重和去 blank → 得到最终标签 z
映射函数 B 的具体操作(回顾 Part 1 第 4 节):
- 先合并相邻的重复字符(如 "aabcc" → "abc")
- 再去除路径中所有的 blank 符号
Args:
logits: 模型输出,shape = (batch, time, vocab_size)
logit_lengths: 有效帧数,shape = (batch,)
idx_to_char: 索引到字符的映射(int -> str)
Returns:
results: 解码结果字符串列表
"""
blank_index = Config.blank_index
# Step 1: 在每个时间步取 argmax → 得到路径
predictions = tf.argmax(logits, axis=-1).numpy()
results = []
for b in range(predictions.shape[0]):
length = logit_lengths[b] if hasattr(logit_lengths, '__getitem__') else logit_lengths.numpy()
if isinstance(length, np.ndarray):
length = int(length[b]) if length.ndim > 0 else int(length)
else:
length = int(length)
seq = predictions[b][:length] # 只取有效部分
# Step 2: 映射函数 B — 去重 + 去 blank
decoded = []
prev_idx = -1
for idx in seq:
idx = int(idx)
if idx != blank_index and idx != prev_idx:
decoded.append(idx)
prev_idx = idx
# 转为文本
text = ''
for idx in decoded:
char = idx_to_char.get(idx, '')
text += char
results.append(text)
return results
7.2 TensorFlow 内置 Beam Search
# src/decode.py(续)
def ctc_beam_search_decode_tf(logits, logit_lengths, idx_to_char,
beam_width=Config.beam_width):
"""
使用 TensorFlow 内置的 CTC Beam Search 解码。
对应 Part 2 第 2 节中介绍的 CTC Beam Search Decoding 算法。
TF 内部实现了完整的 Beam Search 流程,包括 gamma_b / gamma_nb 的递推。
Args:
logits: 模型输出,shape = (batch, time, vocab_size)
logit_lengths: 有效帧数,shape = (batch,)
idx_to_char: 索引到字符的映射
beam_width: 束宽 W
Returns:
results: 解码结果字符串列表
"""
# 转置为 time-major: (time, batch, vocab)
logits_t = tf.transpose(logits, perm=[1, 0, 2])
# 调用 TF 内置的 Beam Search 解码
decoded_sparse_list, log_probs = tf.nn.ctc_beam_search_decoder(
inputs=logits_t,
sequence_length=logit_lengths,
beam_width=beam_width,
top_paths=1 # 只返回概率最高的 1 条路径
)
# decoded_sparse_list[0] 是 SparseTensor,转为 dense
decoded_dense = tf.sparse.to_dense(
decoded_sparse_list[0], default_value=-1
).numpy()
results = []
for b in range(decoded_dense.shape[0]):
text = ''
for idx in decoded_dense[b]:
idx = int(idx)
if idx < 0: # padding(-1)
break
# 注意:ctc_beam_search_decoder 返回的索引已经去除了 blank,
# 并且索引范围是 [0, num_classes-2](不包含 blank)。
# 由于我们的 blank 在最后一个位置(index 27),
# 返回的索引 0-26 与我们的 char 索引一一对应。
char = idx_to_char.get(idx, '')
text += char
results.append(text)
return results
7.3 自定义 Beam Search(完整实现)
为了更好地理解 Part 2 中的算法原理,我们手动实现一个 CTC Beam Search 解码器:
# src/decode.py(续)
def ctc_beam_search_decode_custom(logits_single, logit_length, idx_to_char,
beam_width=Config.beam_width):
"""
自定义 CTC Beam Search 解码(单条样本版本)。
完整实现了 Part 2 第 2 节中的算法流程:
1. 初始化候选集 Omega = {epsilon}(空前缀)
2. 对每个时间步 t:
a. 对 Omega 中每个前缀 l,用 blank 和每个字符 c 分别扩展
b. 正确处理 "扩展字符与末尾字符相同" 的特殊情况
c. 从扩展结果中保留 top-W 个前缀
3. 返回最终概率最高的前缀
关键细节——为什么需要分 gamma_b 和 gamma_nb:
考虑前缀 l = "a",如果当前时间步输出字符也是 "a":
- 如果上一步输出的是 blank,则这个 "a" 是一个新字符 → 前缀变为 "aa"
- 如果上一步输出的不是 blank,则这个 "a" 是重复 → 前缀仍为 "a"
因此必须区分"上一步是否输出了 blank",这就是 gamma_b 和 gamma_nb 存在的原因。
Args:
logits_single: 单条样本的模型输出,shape = (time, vocab_size)
logit_length: 有效帧数(int)
idx_to_char: 索引到字符的映射
beam_width: 束宽 W
Returns:
best_text: 最优解码结果字符串
beam: 完整的 beam 信息(dict,用于调试分析)
"""
blank_index = Config.blank_index
T = int(logit_length)
vocab_size = logits_single.shape[-1]
# Softmax: logits → 概率
if isinstance(logits_single, tf.Tensor):
logits_np = logits_single.numpy()
else:
logits_np = logits_single
probs = np.exp(logits_np[:T] - np.max(logits_np[:T], axis=-1, keepdims=True))
probs = probs / np.sum(probs, axis=-1, keepdims=True)
# ============ 初始化 ============
# 每个候选用 (前缀文本, {p_b, p_nb}) 表示
# p_b: 当前时刻末尾为 blank 的概率
# p_nb: 当前时刻末尾为非 blank 字符的概率
beam = {
'': {'p_b': 1.0, 'p_nb': 0.0}
}
# ============ 逐时间步搜索 ============
for t in range(T):
new_beam = {}
for prefix, scores in beam.items():
p_b = scores['p_b']
p_nb = scores['p_nb']
p_total = p_b + p_nb
# ---- 情况 1: 用 blank 扩展 ----
# 输出 blank 不改变前缀,但更新 p_b
new_p_b = p_total * probs[t, blank_index]
if prefix not in new_beam:
new_beam[prefix] = {'p_b': 0.0, 'p_nb': 0.0}
new_beam[prefix]['p_b'] += new_p_b
# ---- 情况 2: 用非 blank 字符扩展 ----
for c_idx in range(vocab_size):
if c_idx == blank_index:
continue
c_char = idx_to_char.get(c_idx, '')
if c_char == '':
continue
prob_c = probs[t, c_idx]
new_prefix = prefix + c_char
if prefix and c_char == prefix[-1]:
# 特殊情况:扩展字符与末尾字符相同
# (a) 上一步是 blank → 新字符 → 前缀变为 l + c
if new_prefix not in new_beam:
new_beam[new_prefix] = {'p_b': 0.0, 'p_nb': 0.0}
new_beam[new_prefix]['p_nb'] += p_b * prob_c
# (b) 上一步不是 blank → 重复 → 前缀不变(仍为 l)
if prefix not in new_beam:
new_beam[prefix] = {'p_b': 0.0, 'p_nb': 0.0}
new_beam[prefix]['p_nb'] += p_nb * prob_c
else:
# 普通情况:扩展字符与末尾字符不同
if new_prefix not in new_beam:
new_beam[new_prefix] = {'p_b': 0.0, 'p_nb': 0.0}
new_beam[new_prefix]['p_nb'] += p_total * prob_c
# ---- 剪枝:保留 top-W ----
sorted_beam = sorted(
new_beam.items(),
key=lambda x: x[1]['p_b'] + x[1]['p_nb'],
reverse=True
)[:beam_width]
beam = dict(sorted_beam)
# ============ 返回最优结果 ============
best_prefix = max(
beam.items(),
key=lambda x: x[1]['p_b'] + x[1]['p_nb']
)
return best_prefix[0], beam
8. 模型评估与指标计算
8.1 评估指标
语音识别系统最常用的评估指标有两个:
CER(Character Error Rate,字符错误率):在字符级别计算编辑距离
CER = (S + D + I) / N,其中 S = 替换数,D = 删除数,I = 插入数,N = 参考文本的字符数。
WER(Word Error Rate,词错误率):在词级别计算编辑距离,是语音识别领域最主流的评估指标。
WER = (S + D + I) / N,公式相同,但 S/D/I/N 的单位是”词”而非”字符”。
8.2 评估代码
# src/evaluate.py
"""
模型评估模块
使用编辑距离(Levenshtein Distance)计算 CER 和 WER。
"""
import numpy as np
from src.config import Config
from src.data_pipeline import indices_to_text
def levenshtein_distance(ref, hyp):
"""
计算两个序列之间的编辑距离(动态规划实现)。
Args:
ref: 参考序列(list)
hyp: 假设序列(list)
Returns:
distance: 编辑距离
n_ref: 参考序列长度
"""
m, n = len(ref), len(hyp)
dp = np.zeros((m + 1, n + 1), dtype=np.int32)
for i in range(m + 1):
dp[i][0] = i
for j in range(n + 1):
dp[0][j] = j
for i in range(1, m + 1):
for j in range(1, n + 1):
if ref[i - 1] == hyp[j - 1]:
dp[i][j] = dp[i - 1][j - 1]
else:
dp[i][j] = min(
dp[i - 1][j] + 1, # 删除
dp[i][j - 1] + 1, # 插入
dp[i - 1][j - 1] + 1 # 替换
)
return dp[m][n], m
def compute_cer(references, hypotheses):
"""计算字符错误率"""
total_dist, total_len = 0, 0
for ref, hyp in zip(references, hypotheses):
dist, ref_len = levenshtein_distance(list(ref), list(hyp))
total_dist += dist
total_len += ref_len
return total_dist / max(total_len, 1)
def compute_wer(references, hypotheses):
"""计算词错误率"""
total_dist, total_len = 0, 0
for ref, hyp in zip(references, hypotheses):
dist, ref_len = levenshtein_distance(ref.split(), hyp.split())
total_dist += dist
total_len += ref_len
return total_dist / max(total_len, 1)
def evaluate_model(model, dataset, idx_to_char, decode_fn, num_batches=None):
"""
在数据集上评估模型。
Args:
model: CTC 模型
dataset: tf.data.Dataset
idx_to_char: 索引到字符的映射
decode_fn: 解码函数,签名为 fn(logits, lengths, idx_to_char) -> list[str]
num_batches: 评估的 batch 数(None 表示全部)
Returns:
avg_cer, avg_wer
"""
all_refs = []
all_hyps = []
for i, batch in enumerate(dataset):
if num_batches and i >= num_batches:
break
features, labels, feat_lens, label_lens = batch
logits = model(features, training=False)
# 解码
predictions = decode_fn(logits, feat_lens, idx_to_char)
# 还原真实标签
for j in range(labels.shape[0]):
ref_indices = labels[j][:label_lens[j]].numpy().tolist()
ref_text = indices_to_text(ref_indices, idx_to_char)
hyp_text = predictions[j] if j < len(predictions) else ''
all_refs.append(ref_text)
all_hyps.append(hyp_text)
cer = compute_cer(all_refs, all_hyps)
wer = compute_wer(all_refs, all_hyps)
print(f"\n评估结果:")
print(f" CER: {cer*100:.2f}%")
print(f" WER: {wer*100:.2f}%")
print(f" 评估样本数: {len(all_refs)}")
# 打印几个解码样本
print(f"\n 解码示例:")
for i in range(min(5, len(all_refs))):
print(f" REF: {all_refs[i]}")
print(f" HYP: {all_hyps[i]}")
print()
return cer, wer
9. 完整训练流程与实验结果
9.1 主训练脚本
# main.py
"""
CTC 语音识别系统 - 主训练脚本
运行方式: python main.py
"""
import glob
from src.config import Config
from src.build_vocab import build_vocabulary
from src.data_pipeline import parse_librispeech, create_dataset
from src.train import CTCTrainer
from src.decode import ctc_greedy_decode, ctc_beam_search_decode_tf
from src.evaluate import evaluate_model
def main():
# ============ Step 1: 构建词表 ============
print("=" * 60)
print("Step 1: 构建字符集词表")
print("=" * 60)
char_to_idx, idx_to_char = build_vocabulary()
print(f"字符集大小: {Config.vocab_size}")
# ============ Step 2: 解析数据集 ============
print("\n" + "=" * 60)
print("Step 2: 解析数据集")
print("=" * 60)
train_samples = parse_librispeech(Config.data_dir, "train-clean-100")
val_samples = parse_librispeech(Config.data_dir, "test-clean")
# ============ Step 3: 构建数据 Pipeline ============
print("\n" + "=" * 60)
print("Step 3: 构建数据 Pipeline")
print("=" * 60)
train_dataset = create_dataset(train_samples, char_to_idx, is_training=True)
val_dataset = create_dataset(val_samples, char_to_idx, is_training=False)
print("数据 Pipeline 构建完成")
# ============ Step 4: 训练模型 ============
print("\n" + "=" * 60)
print("Step 4: 开始训练")
print("=" * 60)
trainer = CTCTrainer()
trainer.model.summary()
trainer.train(train_dataset, val_dataset, idx_to_char)
# ============ Step 5: 最终评估 ============
print("\n" + "=" * 60)
print("Step 5: 最终评估")
print("=" * 60)
# 加载最优模型
ckpts = sorted(glob.glob(f"{Config.checkpoint_dir}/model_*.h5"))
if ckpts:
trainer.load_checkpoint(ckpts[-1])
# Greedy Decode 评估
print("\n--- Greedy Decoding ---")
cer_greedy, wer_greedy = evaluate_model(
trainer.model, val_dataset, idx_to_char, ctc_greedy_decode
)
# Beam Search 评估
print("\n--- Beam Search Decoding (W=25) ---")
cer_beam, wer_beam = evaluate_model(
trainer.model, val_dataset, idx_to_char, ctc_beam_search_decode_tf
)
# ============ 结果汇总 ============
print("\n" + "=" * 60)
print("实验结果汇总")
print("=" * 60)
print(f" Greedy Decoding - CER: {cer_greedy*100:.2f}% WER: {wer_greedy*100:.2f}%")
print(f" Beam Search (W=25) - CER: {cer_beam*100:.2f}% WER: {wer_beam*100:.2f}%")
if __name__ == "__main__":
main()
9.2 预期实验结果
在 LibriSpeech train-clean-100 上训练约 50-80 个 epoch 后,使用 3 层 BiLSTM(256 units)的模型,预期可以达到以下效果:
| 解码方法 | CER (test-clean) | WER (test-clean) |
| Greedy Decoding | ~15-20% | ~45-55% |
| Beam Search (W=25) | ~13-18% | ~40-50% |
| Beam Search + 外部 LM | ~8-12% | ~25-35% |
几点说明:
- CER 远低于 WER 是正常的——因为即使只错了一个字符,在词级别也算错一整个词。
- Beam Search 比 Greedy 好 验证了 Part 2 中的分析:Beam Search 能搜索到更优的标签序列。
- 加上外部语言模型后提升显著,说明 CTC 的条件独立假设确实限制了模型对输出序列内部依赖的建模能力,语言模型能有效弥补这一不足。
- 以上结果是纯 CTC 模型(无预训练、无数据增强、无外部 LM)的基线水平。当前 SOTA 系统通常使用更大的模型(如 Conformer)、更多的训练数据和更复杂的训练策略。
9.3 训练过程可视化
可以使用 TensorBoard 监控训练过程:
tensorboard --logdir logs/
典型的 CTC 训练 loss 曲线特征:
- 前 5 个 epoch:loss 下降非常快(模型学会了输出 blank 为主的模式)
- 5-30 个 epoch:loss 缓慢但稳定下降(模型开始学习真正的字符输出)
- 30+ 个 epoch:loss 趋于平缓,开始需要学习率衰减来继续优化
10. 常见问题与调优建议
10.1 训练不收敛 / Loss 为 NaN
这是 CTC 训练中最常见的问题。排查步骤:
- 检查 blank_index:确认词表中 blank 的索引与 `tf.nn.ctc_loss` 的约定一致。本实现中 blank = index 27(最后一个位置),是 TF 默认值,无需额外指定。
- 降低学习率:从 1e-4 甚至 1e-5 开始尝试。
- 检查输入长度:确保每条样本的 `feature_length > label_length`。
- 检查特征标准化:确保输入特征已做零均值单位方差标准化。
- 检查是否重复 Softmax:模型输出必须是 logits,不能手动加 Softmax。
10.2 模型只输出 blank
训练初期模型可能陷入”全输出 blank”的局部最优。这是因为 blank 是”安全”的输出——无论标签是什么,输出全 blank 也不会产生太大的 loss(只是不够好)。解决方法:
- 使用更小的初始学习率做预热(warmup)
- 确保训练数据的多样性(shuffle)
- 适当增加训练数据量
10.3 效果提升方向
如果基线效果不理想,可以从以下方向优化:
- 数据增强:速度扰动(Speed Perturbation)、SpecAugment(时间和频率掩码)、添加环境噪声
- 模型升级:将 LSTM 替换为 Conformer(卷积 + Transformer),这是当前语音识别的主流架构
- 更多训练数据:使用 LibriSpeech 960 小时或更大的数据集
- 外部语言模型:在 Beam Search 中融合 n-gram 或 Transformer 语言模型
- 学习率策略:使用 warmup + cosine annealing 等更精细的策略
- 混合精度训练:使用 FP16 加速训练并增大 batch size
11. 总结与展望
本篇作为 CTC 系列的第三部分,将 Part 1 和 Part 2 中的理论知识转化为了可运行的完整代码实现。我们从零开始搭建了一个基于 TensorFlow 的端到端 CTC 语音识别系统,覆盖了从数据预处理、模型构建、CTC Loss 训练到 Beam Search 解码和模型评估的完整流程。
回顾一下全系列的脉络:
– Part 1 解决了”如何训练”的问题:定义了 CTC Loss 函数,利用 Forward-Backward 动态规划算法高效计算 Loss 及其梯度,实现端到端训练。对应到本篇代码中的 `compute_ctc_loss` 函数和 `train_step` 方法。
– Part 2 解决了”如何解码”的问题:介绍了 Greedy Decoding 和 Beam Search Decoding 两种算法。对应到本篇代码中的 `ctc_greedy_decode` 和 `ctc_beam_search_decode_custom` 函数。
– Part 3(本篇) 解决了”如何实现”的问题:将理论公式转化为工程代码,并提供了完整的训练与评估流水线。
尽管 CTC 算法已经提出近 20 年,它至今仍在语音识别领域发挥着重要作用。现代的语音识别系统(如 Whisper、Conformer-Transducer 等)虽然在架构上有所演进,但许多核心思想仍然可以追溯到 CTC——端到端训练的范式、序列级别的损失函数、动态规划的求解思路等。深入理解 CTC 算法,不仅有助于掌握语音识别的基础,也为理解更前沿的技术提供了坚实的理论基石。
希望通过这三篇文章,读者能够对 CTC 算法建立起从理论到实践的完整认知。
References
- Graves et al., Connectionist Temporal Classification: Labelling Unsegmented Sequence Data with RNNs. In ICML, 2006. (Graves提出CTC算法的原始论文)
- Graves et al., A Novel Connectionist System for Unconstrained Handwriting Recognition. In IEEE Transactions on PAML, 2009.(CTC算法在手写字识别中的应用)
- Graves et al., Towards End-to-End Recognition with RNNs. In JMLR, 2014.(CTC算法在端到端声学模型中的应用)
- Alex Graves, Supervised Sequence Labelling with Recurrent Neural Networks. In Studies in Computational Intelligence, Springer, 2012.( Graves 的博士论文,关于sequence learning的研究,主要是CTC)
- Watanabe et al., Hybrid CTC/Attention Architecture for End-to-End Speech Recognition. IEEE Journal of Selected Topics in Signal Processing, 2017.(CTC/Attention 联合解码的代表性工作)
- Miao et al., EESEN: End-to-End Speech Recognition using Deep RNN Models and WFST-Based Decoding. In ASRU, 2015.(基于 WFST 的 CTC 解码框架)
- TensorFlow CTC API 文档:https://www.tensorflow.org/api_docs/python/tf/nn/ctc_loss
- Librosa 文档:https://librosa.org/doc/latest/index.html
Leave a Reply
You must be logged in to post a comment.